You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/03/04 21:04:29 UTC

[kafka] branch 2.8 updated (360ed18 -> b8ba3c0)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a change to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 360ed18  MINOR: Enable topic deletion in the KIP-500 controller (#10184)
     new d69212a  KAFKA-12376: Apply atomic append to the log (#10253)
     new b8ba3c0  HOTFIX: Controller topic deletion should be atomic (#10264)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/src/main/scala/kafka/raft/RaftManager.scala   | 29 ++++++--
 .../controller/ClientQuotaControlManager.java      |  2 +-
 .../kafka/controller/ClusterControlManager.java    |  2 +-
 .../controller/ConfigurationControlManager.java    |  4 +-
 .../apache/kafka/controller/ControllerResult.java  | 38 ++++++----
 .../controller/ControllerResultAndOffset.java      | 32 ++++-----
 .../kafka/controller/FeatureControlManager.java    |  3 +-
 .../apache/kafka/controller/QuorumController.java  | 23 +++---
 .../controller/ReplicationControlManager.java      | 14 ++--
 .../org/apache/kafka/metalog/LocalLogManager.java  | 17 ++++-
 .../org/apache/kafka/metalog/MetaLogManager.java   | 23 +++++-
 .../ConfigurationControlManagerTest.java           | 83 ++++++++++++++++------
 .../controller/FeatureControlManagerTest.java      | 58 ++++++++++-----
 .../controller/ReplicationControlManagerTest.java  | 24 ++++---
 .../org/apache/kafka/metalog/LocalLogManager.java  | 17 ++++-
 .../kafka/raft/metadata/MetaLogRaftShim.java       | 28 +++++++-
 16 files changed, 282 insertions(+), 115 deletions(-)


[kafka] 02/02: HOTFIX: Controller topic deletion should be atomic (#10264)

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b8ba3c04db6a4e53d4904918774b19346ad535c8
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Mar 4 12:19:34 2021 -0800

    HOTFIX: Controller topic deletion should be atomic (#10264)
    
    Topic deletions should be atomic. This fixes a build error caused by merging of both https://github.com/apache/kafka/pull/10253 and https://github.com/apache/kafka/pull/10184 at about the same time.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../controller/ReplicationControlManager.java      |  2 +-
 .../controller/ReplicationControlManagerTest.java  | 24 ++++++++++++----------
 2 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 8bc0670..4a58b3a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -624,7 +624,7 @@ public class ReplicationControlManager {
                 results.put(id, ApiError.fromThrowable(e));
             }
         }
-        return new ControllerResult<>(records, results);
+        return ControllerResult.atomicOf(records, results);
     }
 
     void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index da6e4af..8b00c10 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -67,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(40)
@@ -451,19 +452,19 @@ public class ReplicationControlManagerTest {
         unfenceBroker(0, ctx);
         registerBroker(1, ctx);
         unfenceBroker(1, ctx);
-        ControllerResult<CreateTopicsResponseData> result =
+        ControllerResult<CreateTopicsResponseData> createResult =
             replicationControl.createTopics(request);
         CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
-        Uuid topicId = result.response().topics().find("foo").topicId();
+        Uuid topicId = createResult.response().topics().find("foo").topicId();
         expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
             setNumPartitions(3).setReplicationFactor((short) 2).
             setErrorMessage(null).setErrorCode((short) 0).
             setTopicId(topicId));
-        assertEquals(expectedResponse, result.response());
+        assertEquals(expectedResponse, createResult.response());
         // Until the records are replayed, no changes are made
         assertNull(replicationControl.getPartition(topicId, 0));
         assertEmptyTopicConfigs(ctx, "foo");
-        ctx.replay(result.records());
+        ctx.replay(createResult.records());
         assertNotNull(replicationControl.getPartition(topicId, 0));
         assertNotNull(replicationControl.getPartition(topicId, 1));
         assertNotNull(replicationControl.getPartition(topicId, 2));
@@ -483,17 +484,18 @@ public class ReplicationControlManagerTest {
             new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
                 replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
 
-        ControllerResult<Map<Uuid, ApiError>> result1 = replicationControl.
+        ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = replicationControl.
             deleteTopics(Collections.singletonList(invalidId));
-        assertEquals(0, result1.records().size());
+        assertEquals(0, invalidDeleteResult.records().size());
         assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
-            result1.response());
-        ControllerResult<Map<Uuid, ApiError>> result2 = replicationControl.
+            invalidDeleteResult.response());
+        ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.
             deleteTopics(Collections.singletonList(topicId));
+        assertTrue(deleteResult.isAtomic());
         assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)),
-            result2.response());
-        assertEquals(1, result2.records().size());
-        ctx.replay(result2.records());
+            deleteResult.response());
+        assertEquals(1, deleteResult.records().size());
+        ctx.replay(deleteResult.records());
         assertNull(replicationControl.getPartition(topicId, 0));
         assertNull(replicationControl.getPartition(topicId, 1));
         assertNull(replicationControl.getPartition(topicId, 2));


[kafka] 01/02: KAFKA-12376: Apply atomic append to the log (#10253)

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d69212a42246eeaa123e0a574d08ba44e37e4a1d
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Thu Mar 4 10:55:43 2021 -0800

    KAFKA-12376: Apply atomic append to the log (#10253)
---
 core/src/main/scala/kafka/raft/RaftManager.scala   | 29 ++++++--
 .../controller/ClientQuotaControlManager.java      |  2 +-
 .../kafka/controller/ClusterControlManager.java    |  2 +-
 .../controller/ConfigurationControlManager.java    |  4 +-
 .../apache/kafka/controller/ControllerResult.java  | 38 ++++++----
 .../controller/ControllerResultAndOffset.java      | 32 ++++-----
 .../kafka/controller/FeatureControlManager.java    |  3 +-
 .../apache/kafka/controller/QuorumController.java  | 23 +++---
 .../controller/ReplicationControlManager.java      | 12 ++--
 .../org/apache/kafka/metalog/LocalLogManager.java  | 17 ++++-
 .../org/apache/kafka/metalog/MetaLogManager.java   | 23 +++++-
 .../ConfigurationControlManagerTest.java           | 83 ++++++++++++++++------
 .../controller/FeatureControlManagerTest.java      | 58 ++++++++++-----
 .../org/apache/kafka/metalog/LocalLogManager.java  | 17 ++++-
 .../kafka/raft/metadata/MetaLogRaftShim.java       | 28 +++++++-
 15 files changed, 268 insertions(+), 103 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 1881a1d..3b714f3 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -92,6 +92,11 @@ trait RaftManager[T] {
     listener: RaftClient.Listener[T]
   ): Unit
 
+  def scheduleAtomicAppend(
+    epoch: Int,
+    records: Seq[T]
+  ): Option[Long]
+
   def scheduleAppend(
     epoch: Int,
     records: Seq[T]
@@ -157,16 +162,32 @@ class KafkaRaftManager[T](
     raftClient.register(listener)
   }
 
+  override def scheduleAtomicAppend(
+    epoch: Int,
+    records: Seq[T]
+  ): Option[Long] = {
+    append(epoch, records, true)
+  }
+
   override def scheduleAppend(
     epoch: Int,
     records: Seq[T]
   ): Option[Long] = {
-    val offset: java.lang.Long = raftClient.scheduleAppend(epoch, records.asJava)
-    if (offset == null) {
-      None
+    append(epoch, records, false)
+  }
+
+  private def append(
+    epoch: Int,
+    records: Seq[T],
+    isAtomic: Boolean
+  ): Option[Long] = {
+    val offset = if (isAtomic) {
+      raftClient.scheduleAtomicAppend(epoch, records.asJava)
     } else {
-      Some(Long.unbox(offset))
+      raftClient.scheduleAppend(epoch, records.asJava)
     }
+
+    Option(offset).map(Long.unbox)
   }
 
   override def handleRequest(
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
index 4aac9e4..9b8e2d6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
@@ -86,7 +86,7 @@ public class ClientQuotaControlManager {
             }
         });
 
-        return new ControllerResult<>(outputRecords, outputResults);
+        return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
     /**
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 6e329c7..4748d19 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -213,7 +213,7 @@ public class ClusterControlManager {
 
         List<ApiMessageAndVersion> records = new ArrayList<>();
         records.add(new ApiMessageAndVersion(record, (short) 0));
-        return new ControllerResult<>(records, new BrokerRegistrationReply(brokerEpoch));
+        return ControllerResult.of(records, new BrokerRegistrationReply(brokerEpoch));
     }
 
     public void replay(RegisterBrokerRecord record) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 16e58fa..dcfe92d46 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -83,7 +83,7 @@ public class ConfigurationControlManager {
                 outputRecords,
                 outputResults);
         }
-        return new ControllerResult<>(outputRecords, outputResults);
+        return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
     private void incrementalAlterConfigResource(ConfigResource configResource,
@@ -171,7 +171,7 @@ public class ConfigurationControlManager {
                 outputRecords,
                 outputResults);
         }
-        return new ControllerResult<>(outputRecords, outputResults);
+        return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
     private void legacyAlterConfigResource(ConfigResource configResource,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
index 4906c8b..e6ae031 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
@@ -19,7 +19,7 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.metadata.ApiMessageAndVersion;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -28,15 +28,13 @@ import java.util.stream.Collectors;
 class ControllerResult<T> {
     private final List<ApiMessageAndVersion> records;
     private final T response;
+    private final boolean isAtomic;
 
-    public ControllerResult(T response) {
-        this(new ArrayList<>(), response);
-    }
-
-    public ControllerResult(List<ApiMessageAndVersion> records, T response) {
+    protected ControllerResult(List<ApiMessageAndVersion> records, T response, boolean isAtomic) {
         Objects.requireNonNull(records);
         this.records = records;
         this.response = response;
+        this.isAtomic = isAtomic;
     }
 
     public List<ApiMessageAndVersion> records() {
@@ -47,6 +45,10 @@ class ControllerResult<T> {
         return response;
     }
 
+    public boolean isAtomic() {
+        return isAtomic;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null || (!o.getClass().equals(getClass()))) {
@@ -54,22 +56,34 @@ class ControllerResult<T> {
         }
         ControllerResult other = (ControllerResult) o;
         return records.equals(other.records) &&
-            Objects.equals(response, other.response);
+            Objects.equals(response, other.response) &&
+            Objects.equals(isAtomic, other.isAtomic);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(records, response);
+        return Objects.hash(records, response, isAtomic);
     }
 
     @Override
     public String toString() {
-        return "ControllerResult(records=" + String.join(",",
-            records.stream().map(r -> r.toString()).collect(Collectors.toList())) +
-            ", response=" + response + ")";
+        return String.format(
+            "ControllerResult(records=%s, response=%s, isAtomic=%s)",
+            String.join(",", records.stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
+            response,
+            isAtomic
+        );
     }
 
     public ControllerResult<T> withoutRecords() {
-        return new ControllerResult<>(new ArrayList<>(), response);
+        return new ControllerResult<>(Collections.emptyList(), response, false);
+    }
+
+    public static <T> ControllerResult<T> atomicOf(List<ApiMessageAndVersion> records, T response) {
+        return new ControllerResult<>(records, response, true);
+    }
+
+    public static <T> ControllerResult<T> of(List<ApiMessageAndVersion> records, T response) {
+        return new ControllerResult<>(records, response, false);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
index 5e483f7..8b8ca8d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
@@ -19,24 +19,15 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.metadata.ApiMessageAndVersion;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
 
-class ControllerResultAndOffset<T> extends ControllerResult<T> {
+final class ControllerResultAndOffset<T> extends ControllerResult<T> {
     private final long offset;
 
-    public ControllerResultAndOffset(T response) {
-        super(new ArrayList<>(), response);
-        this.offset = -1;
-    }
-
-    public ControllerResultAndOffset(long offset,
-                              List<ApiMessageAndVersion> records,
-                              T response) {
-        super(records, response);
+    private ControllerResultAndOffset(long offset, ControllerResult<T> result) {
+        super(result.records(), result.response(), result.isAtomic());
         this.offset = offset;
     }
 
@@ -52,18 +43,27 @@ class ControllerResultAndOffset<T> extends ControllerResult<T> {
         ControllerResultAndOffset other = (ControllerResultAndOffset) o;
         return records().equals(other.records()) &&
             response().equals(other.response()) &&
+            isAtomic() == other.isAtomic() &&
             offset == other.offset;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(records(), response(), offset);
+        return Objects.hash(records(), response(), isAtomic(), offset);
     }
 
     @Override
     public String toString() {
-        return "ControllerResultAndOffset(records=" + String.join(",",
-            records().stream().map(r -> r.toString()).collect(Collectors.toList())) +
-            ", response=" + response() + ", offset=" + offset + ")";
+        return String.format(
+            "ControllerResultAndOffset(records=%s, response=%s, isAtomic=%s, offset=%s)",
+            String.join(",", records().stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
+            response(),
+            isAtomic(),
+            offset
+        );
+    }
+
+    public static <T> ControllerResultAndOffset<T> of(long offset, ControllerResult<T> result) {
+        return new ControllerResultAndOffset<>(offset, result);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 25ff3fd..99874ac 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -69,7 +69,8 @@ public class FeatureControlManager {
             results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
                 downgradeables.contains(entry.getKey()), brokerFeatures, records));
         }
-        return new ControllerResult<>(records, results);
+
+        return ControllerResult.atomicOf(records, results);
     }
 
     private ApiError updateFeature(String featureName,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3e5a9a8..6ee1b7e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.controller;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -268,7 +267,7 @@ public final class QuorumController implements Controller {
     class ControlEvent implements EventQueue.Event {
         private final String name;
         private final Runnable handler;
-        private long eventCreatedTimeNs = time.nanoseconds();
+        private final long eventCreatedTimeNs = time.nanoseconds();
         private Optional<Long> startProcessingTimeNs = Optional.empty();
 
         ControlEvent(String name, Runnable handler) {
@@ -310,7 +309,7 @@ public final class QuorumController implements Controller {
         private final String name;
         private final CompletableFuture<T> future;
         private final Supplier<T> handler;
-        private long eventCreatedTimeNs = time.nanoseconds();
+        private final long eventCreatedTimeNs = time.nanoseconds();
         private Optional<Long> startProcessingTimeNs = Optional.empty();
 
         ControllerReadEvent(String name, Supplier<T> handler) {
@@ -392,7 +391,7 @@ public final class QuorumController implements Controller {
         private final String name;
         private final CompletableFuture<T> future;
         private final ControllerWriteOperation<T> op;
-        private long eventCreatedTimeNs = time.nanoseconds();
+        private final long eventCreatedTimeNs = time.nanoseconds();
         private Optional<Long> startProcessingTimeNs = Optional.empty();
         private ControllerResultAndOffset<T> resultAndOffset;
 
@@ -426,8 +425,7 @@ public final class QuorumController implements Controller {
                 if (!maybeOffset.isPresent()) {
                     // If the purgatory is empty, there are no pending operations and no
                     // uncommitted state.  We can return immediately.
-                    resultAndOffset = new ControllerResultAndOffset<>(-1,
-                        new ArrayList<>(), result.response());
+                    resultAndOffset = ControllerResultAndOffset.of(-1, result);
                     log.debug("Completing read-only operation {} immediately because " +
                         "the purgatory is empty.", this);
                     complete(null);
@@ -435,8 +433,7 @@ public final class QuorumController implements Controller {
                 }
                 // If there are operations in the purgatory, we want to wait for the latest
                 // one to complete before returning our result to the user.
-                resultAndOffset = new ControllerResultAndOffset<>(maybeOffset.get(),
-                    result.records(), result.response());
+                resultAndOffset = ControllerResultAndOffset.of(maybeOffset.get(), result);
                 log.debug("Read-only operation {} will be completed when the log " +
                     "reaches offset {}", this, resultAndOffset.offset());
             } else {
@@ -444,11 +441,15 @@ public final class QuorumController implements Controller {
                 // written before we can return our result to the user.  Here, we hand off
                 // the batch of records to the metadata log manager.  They will be written
                 // out asynchronously.
-                long offset = logManager.scheduleWrite(controllerEpoch, result.records());
+                final long offset;
+                if (result.isAtomic()) {
+                    offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records());
+                } else {
+                    offset = logManager.scheduleWrite(controllerEpoch, result.records());
+                }
                 op.processBatchEndOffset(offset);
                 writeOffset = offset;
-                resultAndOffset = new ControllerResultAndOffset<>(offset,
-                    result.records(), result.response());
+                resultAndOffset = ControllerResultAndOffset.of(offset, result);
                 for (ApiMessageAndVersion message : result.records()) {
                     replay(message.message(), offset);
                 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 14ff321..8bc0670 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -440,7 +440,7 @@ public class ReplicationControlManager {
             resultsPrefix = ", ";
         }
         log.info("createTopics result(s): {}", resultsBuilder.toString());
-        return new ControllerResult<>(records, data);
+        return ControllerResult.atomicOf(records, data);
     }
 
     private ApiError createTopic(CreatableTopic topic,
@@ -721,7 +721,7 @@ public class ReplicationControlManager {
                     setIsr(partitionData.newIsr()));
             }
         }
-        return new ControllerResult<>(records, response);
+        return ControllerResult.of(records, response);
     }
 
     /**
@@ -875,7 +875,7 @@ public class ReplicationControlManager {
                     setErrorMessage(error.message()));
             }
         }
-        return new ControllerResult<>(records, response);
+        return ControllerResult.of(records, response);
     }
 
     static boolean electionIsUnclean(byte electionType) {
@@ -970,7 +970,7 @@ public class ReplicationControlManager {
                 states.next().fenced(),
                 states.next().inControlledShutdown(),
                 states.next().shouldShutDown());
-        return new ControllerResult<>(records, reply);
+        return ControllerResult.of(records, reply);
     }
 
     int bestLeader(int[] replicas, int[] isr, boolean unclean) {
@@ -999,7 +999,7 @@ public class ReplicationControlManager {
         }
         List<ApiMessageAndVersion> records = new ArrayList<>();
         handleBrokerUnregistered(brokerId, registration.epoch(), records);
-        return new ControllerResult<>(records, null);
+        return ControllerResult.of(records, null);
     }
 
     ControllerResult<Void> maybeFenceStaleBrokers() {
@@ -1011,6 +1011,6 @@ public class ReplicationControlManager {
             handleBrokerFenced(brokerId, records);
             heartbeatManager.fence(brokerId);
         }
-        return new ControllerResult<>(records, null);
+        return ControllerResult.of(records, null);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
index ef85314..99ae3a7 100644
--- a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -328,8 +328,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(
+            nodeId,
+            leader.epoch(),
+            new LocalRecordBatch(
+                batch
+                    .stream()
+                    .map(ApiMessageAndVersion::message)
+                    .collect(Collectors.toList())
+            )
+        );
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
index 67a6ca5..9126245 100644
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
+++ b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
@@ -50,14 +50,31 @@ public interface MetaLogManager {
      * offset before renouncing its leadership.  The listener should determine this by
      * monitoring the committed offsets.
      *
-     * @param epoch         The controller epoch.
-     * @param batch         The batch of messages to write.
+     * @param epoch         the controller epoch
+     * @param batch         the batch of messages to write
      *
-     * @return              The offset of the message.
+     * @return              the offset of the last message in the batch
+     * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
      */
     long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
 
     /**
+     * Schedule a atomic write to the log.
+     *
+     * The write will be scheduled to happen at some time in the future.  All of the messages in batch
+     * will be appended atomically in one batch.  The listener may regard the write as successful
+     * if and only if the MetaLogManager reaches the given offset before renouncing its leadership.
+     * The listener should determine this by monitoring the committed offsets.
+     *
+     * @param epoch         the controller epoch
+     * @param batch         the batch of messages to write
+     *
+     * @return              the offset of the last message in the batch
+     * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
+     */
+    long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch);
+
+    /**
      * Renounce the leadership.
      *
      * @param epoch         The epoch.  If this does not match the current epoch, this
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 49a5533..561a25b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -135,18 +135,42 @@ public class ConfigurationControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS);
-        assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Collections.singletonList(
-            new ApiMessageAndVersion(new ConfigRecord().
-                setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("abc").setValue("123"), (short) 0)),
-            toMap(entry(BROKER0, new ApiError(
-                Errors.INVALID_REQUEST, "A DELETE op was given with a non-null value.")),
-                entry(MYTOPIC, ApiError.NONE))),
-            manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
-                entry("foo.bar", entry(DELETE, "abc")),
-                entry("quux", entry(SET, "abc")))),
-            entry(MYTOPIC, toMap(
-                entry("abc", entry(APPEND, "123")))))));
+        assertEquals(
+            ControllerResult.atomicOf(
+                Collections.singletonList(
+                    new ApiMessageAndVersion(
+                        new ConfigRecord()
+                            .setResourceType(TOPIC.id())
+                            .setResourceName("mytopic")
+                            .setName("abc")
+                            .setValue("123"),
+                        (short) 0
+                    )
+                ),
+                toMap(
+                    entry(
+                        BROKER0,
+                        new ApiError(
+                            Errors.INVALID_REQUEST,
+                            "A DELETE op was given with a non-null value."
+                        )
+                    ),
+                    entry(MYTOPIC, ApiError.NONE)
+                )
+            ),
+            manager.incrementalAlterConfigs(
+                toMap(
+                    entry(
+                        BROKER0,
+                        toMap(
+                            entry("foo.bar", entry(DELETE, "abc")),
+                            entry("quux", entry(SET, "abc"))
+                        )
+                    ),
+                    entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))
+                )
+            )
+        );
     }
 
     @Test
@@ -184,20 +208,33 @@ public class ConfigurationControlManagerTest {
             new ApiMessageAndVersion(new ConfigRecord().
                 setResourceType(TOPIC.id()).setResourceName("mytopic").
                 setName("def").setValue("901"), (short) 0));
-        assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(
+        assertEquals(
+            ControllerResult.atomicOf(
                 expectedRecords1,
-                toMap(entry(MYTOPIC, ApiError.NONE))),
-            manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(
-                entry("abc", "456"), entry("def", "901"))))));
+                toMap(entry(MYTOPIC, ApiError.NONE))
+            ),
+            manager.legacyAlterConfigs(
+                toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", "901"))))
+            )
+        );
         for (ApiMessageAndVersion message : expectedRecords1) {
             manager.replay((ConfigRecord) message.message());
         }
-        assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Arrays.asList(
-            new ApiMessageAndVersion(new ConfigRecord().
-                setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("abc").setValue(null), (short) 0)),
-                toMap(entry(MYTOPIC, ApiError.NONE))),
-            manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(
-                entry("def", "901"))))));
+        assertEquals(
+            ControllerResult.atomicOf(
+                Arrays.asList(
+                    new ApiMessageAndVersion(
+                        new ConfigRecord()
+                            .setResourceType(TOPIC.id())
+                            .setResourceName("mytopic")
+                            .setName("abc")
+                            .setValue(null),
+                        (short) 0
+                    )
+                ),
+                toMap(entry(MYTOPIC, ApiError.NONE))
+            ),
+            manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))))
+        );
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 8687cc8..0670984 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -18,10 +18,8 @@
 package org.apache.kafka.controller;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
@@ -61,11 +59,11 @@ public class FeatureControlManagerTest {
             rangeMap("foo", 1, 2), snapshotRegistry);
         assertEquals(new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), -1),
             manager.finalizedFeatures(-1));
-        assertEquals(new ControllerResult<>(Collections.
+        assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "The controller does not support the given feature range."))),
             manager.updateFeatures(rangeMap("foo", 1, 3),
-                new HashSet<>(Arrays.asList("foo")),
+                Collections.singleton("foo"),
                 Collections.emptyMap()));
         ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
             rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(),
@@ -101,12 +99,24 @@ public class FeatureControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         FeatureControlManager manager = new FeatureControlManager(
             rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
-        assertEquals(new ControllerResult<>(Collections.
-                singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Broker 5 does not support the given feature range."))),
-            manager.updateFeatures(rangeMap("foo", 1, 3),
-                new HashSet<>(Arrays.asList("foo")),
-                Collections.singletonMap(5, rangeMap())));
+
+        assertEquals(
+            ControllerResult.atomicOf(
+                Collections.emptyList(),
+                Collections.singletonMap(
+                    "foo",
+                    new ApiError(
+                        Errors.INVALID_UPDATE_VERSION,
+                        "Broker 5 does not support the given feature range."
+                    )
+                )
+            ),
+            manager.updateFeatures(
+                rangeMap("foo", 1, 3),
+                Collections.singleton("foo"),
+                Collections.singletonMap(5, rangeMap())
+            )
+        );
 
         ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
             rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap());
@@ -114,19 +124,31 @@ public class FeatureControlManagerTest {
         manager.replay((FeatureLevelRecord) result.records().get(0).message(), 3);
         snapshotRegistry.createSnapshot(3);
 
-        assertEquals(new ControllerResult<>(Collections.
+        assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "Can't downgrade the maximum version of this feature without " +
                     "setting downgradable to true."))),
             manager.updateFeatures(rangeMap("foo", 1, 2),
                 Collections.emptySet(), Collections.emptyMap()));
 
-        assertEquals(new ControllerResult<>(
-            Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
-                    setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2),
-                    (short) 0)),
-                Collections.singletonMap("foo", ApiError.NONE)),
-            manager.updateFeatures(rangeMap("foo", 1, 2),
-                new HashSet<>(Collections.singletonList("foo")), Collections.emptyMap()));
+        assertEquals(
+            ControllerResult.atomicOf(
+                Collections.singletonList(
+                    new ApiMessageAndVersion(
+                        new FeatureLevelRecord()
+                            .setName("foo")
+                            .setMinFeatureLevel((short) 1)
+                            .setMaxFeatureLevel((short) 2),
+                        (short) 0
+                    )
+                ),
+                Collections.singletonMap("foo", ApiError.NONE)
+            ),
+            manager.updateFeatures(
+                rangeMap("foo", 1, 2),
+                Collections.singleton("foo"),
+                Collections.emptyMap()
+            )
+        );
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 7b6cf06..590f89c 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -371,8 +371,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(
+            nodeId,
+            leader.epoch(),
+            new LocalRecordBatch(
+                batch
+                    .stream()
+                    .map(ApiMessageAndVersion::message)
+                    .collect(Collectors.toList())
+            )
+        );
     }
 
     @Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
index bf88e7d..1ca63f1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
+++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
@@ -53,8 +53,34 @@ public class MetaLogRaftShim implements MetaLogManager {
     }
 
     @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return write(epoch, batch, true);
+    }
+
+    @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return client.scheduleAppend((int) epoch, batch);
+        return write(epoch, batch, false);
+    }
+
+    private long write(long epoch, List<ApiMessageAndVersion> batch, boolean isAtomic) {
+        final Long result;
+        if (isAtomic) {
+            result = client.scheduleAtomicAppend((int) epoch, batch);
+        } else {
+            result = client.scheduleAppend((int) epoch, batch);
+        }
+
+        if (result == null) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)",
+                    epoch,
+                    batch
+                )
+            );
+        } else {
+            return result;
+        }
     }
 
     @Override