You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/04 17:32:18 UTC

[kafka] branch trunk updated: KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in KRaft mode (#12106)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 430f75ba22 KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in KRaft mode (#12106)
430f75ba22 is described below

commit 430f75ba227b8f600ee7fafc2b83e2e97c38f03b
Author: Akhilesh Chaganti <ak...@users.noreply.github.com>
AuthorDate: Wed May 4 10:31:46 2022 -0700

    KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in KRaft mode (#12106)
    
    The KRaft implementation of the `CreatePartitions` ignores the `validateOnly` flag in the
    request and creates the partitions if the validations are successful. Fixed the behavior
    not to create partitions upon validation if the `validateOnly` flag is true.
    
    Reviewers: Divij Vaidya <di...@gmail.com>, dengziming <de...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/server/ControllerApis.scala   |   2 +-
 core/src/test/java/kafka/test/MockController.java  |   3 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 119 ++++++++++++++++-----
 .../unit/kafka/server/ControllerApisTest.scala     |  32 ++++--
 .../org/apache/kafka/controller/Controller.java    |   5 +-
 .../apache/kafka/controller/QuorumController.java  |  16 ++-
 .../controller/ReplicationControlManager.java      |   2 +-
 .../kafka/controller/QuorumControllerTest.java     |   4 +-
 8 files changed, 139 insertions(+), 44 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 3bd4ca5fce..0aaab0aefb 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -767,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel,
           setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
       }
     }
-    controller.createPartitions(context, topics).thenApply { results =>
+    controller.createPartitions(context, topics, request.validateOnly).thenApply { results =>
       results.forEach(response => responses.add(response))
       responses
     }
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
index f6bfd207d4..ff1154d211 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -413,7 +413,8 @@ public class MockController implements Controller {
     @Override
     synchronized public CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
         ControllerRequestContext context,
-        List<CreatePartitionsTopic> topicList
+        List<CreatePartitionsTopic> topicList,
+        boolean validateOnly
     ) {
         if (!active) {
             CompletableFuture<List<CreatePartitionsTopicResult>> future = new CompletableFuture<>();
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index b095a6170c..81e9f692a8 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -25,12 +25,11 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.{Collections, Optional, Properties}
 import java.{time, util}
-
 import kafka.log.LogConfig
 import kafka.security.authorizer.AclEntry
 import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
-import kafka.utils.{Log4jController, TestUtils}
+import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.HostResolver
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -425,8 +424,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     checkValidAlterConfigs(client, topicResource1, topicResource2)
   }
 
-  @Test
-  def testCreatePartitions(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreatePartitions(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     // Create topics
@@ -490,7 +490,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidPartitionsException when newCount is a decrease")
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic currently has 3 partitions, which is higher than the requested 1.", e.getCause.getMessage, desc)
+      var exceptionMsgStr = if (isKRaftTest()) {
+        "The topic create-partitions-topic-1 currently has 3 partition(s); 1 would not be an increase."
+      } else {
+        "Topic currently has 3 partitions, which is higher than the requested 1."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try a newCount which would be a noop (without assignment)
@@ -499,7 +504,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get,
         () => s"$desc: Expect InvalidPartitionsException when requesting a noop")
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "Topic already has 3 partition(s)."
+      } else {
+        "Topic already has 3 partitions."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic2, Some(3)), desc)
 
       // try a newCount which would be a noop (where the assignment matches current state)
@@ -507,7 +517,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
         NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, option)
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get)
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc)
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic2, Some(3)), desc)
 
       // try a newCount which would be a noop (where the assignment doesn't match current state)
@@ -515,7 +525,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
         NewPartitions.increaseTo(3, newPartition2Assignments.asScala.reverse.toList.asJava)).asJava, option)
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get)
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc)
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic2, Some(3)), desc)
 
       // try a bad topic name
@@ -525,7 +535,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(unknownTopic).get,
         () => s"$desc: Expect InvalidTopicException when using an unknown topic")
       assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], desc)
-      assertEquals("The topic 'an-unknown-topic' does not exist.", e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "This server does not host this topic-partition."
+      } else {
+        "The topic 'an-unknown-topic' does not exist."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
 
       // try an invalid newCount
       alterResult = client.createPartitions(Map(topic1 ->
@@ -533,7 +548,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidPartitionsException when newCount is invalid")
       assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-      assertEquals("Topic currently has 3 partitions, which is higher than the requested -22.", e.getCause.getMessage,
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The topic create-partitions-topic-1 currently has 3 partition(s); -22 would not be an increase."
+      } else {
+        "Topic currently has 3 partitions, which is higher than the requested -22."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage,
         desc)
       assertEquals(3, numPartitions(topic1), desc)
 
@@ -543,9 +563,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidPartitionsException when #brokers != replication factor")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
-      assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " +
-        "while partitions [3] have replication factors [2], respectively.",
-        e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The manual partition assignment includes a partition with 2 replica(s), but this is not " +
+          "consistent with previous partitions, which have 1 replica(s)."
+      } else {
+        "Inconsistent replication factor between partitions, partition 0 has 1 while partitions [3] " +
+          "have replication factors [2], respectively."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try #assignments < with the increase
@@ -554,7 +579,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
-      assertEquals("Increasing the number of partitions by 3 but 1 assignments provided.", e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "Attempted to add 3 additional partition(s), but only 1 assignment(s) were specified."
+      } else {
+        "Increasing the number of partitions by 3 but 1 assignments provided."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try #assignments > with the increase
@@ -562,8 +592,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
         NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, option)
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount")
+      exceptionMsgStr = if (isKRaftTest()) {
+        "Attempted to add 1 additional partition(s), but only 2 assignment(s) were specified."
+      } else {
+        "Increasing the number of partitions by 1 but 2 assignments provided."
+      }
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
-      assertEquals("Increasing the number of partitions by 1 but 2 assignments provided.", e.getCause.getMessage, desc)
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try with duplicate brokers in assignments
@@ -572,8 +607,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when assignments has duplicate brokers")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
-      assertEquals("Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3.",
-        e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The manual partition assignment includes the broker 1 more than once."
+      } else {
+        "Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try assignments with differently sized inner lists
@@ -582,8 +621,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
-      assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " +
-        "while partitions [4] have replication factors [2], respectively.", e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The manual partition assignment includes a partition with 2 replica(s), but this is not " +
+          "consistent with previous partitions, which have 1 replica(s)."
+      } else {
+        "Inconsistent replication factor between partitions, partition 0 has 1 " +
+          "while partitions [4] have replication factors [2], respectively."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try assignments with unknown brokers
@@ -592,7 +637,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when assignments contains an unknown broker")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
-      assertEquals("Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "The manual partition assignment includes broker 12, but no such broker is registered."
+      } else {
+        "Unknown broker(s) in replica assignment: 12."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
 
       // try with empty assignments
@@ -601,7 +651,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
       e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
         () => s"$desc: Expect InvalidReplicaAssignmentException when assignments is empty")
       assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
-      assertEquals("Increasing the number of partitions by 1 but 0 assignments provided.", e.getCause.getMessage, desc)
+      exceptionMsgStr = if (isKRaftTest()) {
+        "Attempted to add 1 additional partition(s), but only 0 assignment(s) were specified."
+      } else {
+        "Increasing the number of partitions by 1 but 0 assignments provided."
+      }
+      assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
       assertEquals(3, numPartitions(topic1), desc)
     }
 
@@ -614,18 +669,30 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out waiting for new partitions to appear")
     var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get)
     assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-    assertEquals("Topic currently has 3 partitions, which is higher than the requested 2.", e.getCause.getMessage)
+    val exceptionMsgStr = if (isKRaftTest()) {
+      "The topic create-partitions-topic-2 currently has 3 partition(s); 2 would not be an increase."
+    } else {
+      "Topic currently has 3 partitions, which is higher than the requested 2."
+    }
+    assertEquals(exceptionMsgStr, e.getCause.getMessage)
     assertEquals(3, numPartitions(topic2))
 
-    // finally, try to add partitions to a topic queued for deletion
+    // Delete the topic. Verify addition of partitions to deleted topic is not possible. In
+    // Zookeeper mode, the topic is queued for deletion. In KRaft, the deletion occurs
+    // immediately and hence we have a different Exception thrown in the response.
     val deleteResult = client.deleteTopics(asList(topic1))
     deleteResult.topicNameValues.get(topic1).get
     alterResult = client.createPartitions(Map(topic1 ->
       NewPartitions.increaseTo(4)).asJava, validateOnly)
     e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
-      () => "Expect InvalidTopicException when the topic is queued for deletion")
-    assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
-    assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+      () => "Expect InvalidTopicException or UnknownTopicOrPartitionException when the topic is queued for deletion")
+    if (isKRaftTest()) {
+      assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], e.toString)
+      assertEquals("This server does not host this topic-partition.", e.getCause.getMessage)
+    } else {
+      assertTrue(e.getCause.isInstanceOf[InvalidTopicException], e.toString)
+      assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+    }
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index d28ee38db6..9f0ae482c5 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -20,9 +20,8 @@ package kafka.server
 import java.net.InetAddress
 import java.util
 import java.util.Collections.singletonList
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.concurrent.{CompletableFuture, ExecutionException}
-
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
@@ -59,6 +58,8 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, A
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@@ -717,11 +718,10 @@ class ControllerApisTest {
         _ => Set("foo", "bar")))
   }
 
-  @Test
-  def testCreatePartitionsRequest(): Unit = {
-    val controller = new MockController.Builder().
-      newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
-      newInitialTopic("bar", Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")).build()
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testCreatePartitionsRequest(validateOnly: Boolean): Unit = {
+    val controller = mock(classOf[Controller])
     val controllerApis = createControllerApis(None, controller)
     val request = new CreatePartitionsRequestData()
     request.topics().add(new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
@@ -729,9 +729,23 @@ class ControllerApisTest {
     request.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
     request.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
     request.topics().add(new CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
+    request.setValidateOnly(validateOnly)
+
+    // Check if the controller is called correctly with the 'validateOnly' field set appropriately.
+    when(controller.createPartitions(
+      any(),
+      ArgumentMatchers.eq(
+        Collections.singletonList(
+          new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))),
+      ArgumentMatchers.eq(validateOnly))).thenReturn(CompletableFuture
+      .completedFuture(Collections.singletonList(
+        new CreatePartitionsTopicResult().setName("foo").
+          setErrorCode(NONE.code()).
+          setErrorMessage(null)
+      )))
     assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
-        setErrorCode(NONE.code()).
-        setErrorMessage(null),
+      setErrorCode(NONE.code()).
+      setErrorMessage(null),
       new CreatePartitionsTopicResult().setName("bar").
         setErrorCode(INVALID_REQUEST.code()).
         setErrorMessage("Duplicate topic name."),
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 6da8fb5eda..3622fe225d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -328,11 +328,14 @@ public interface Controller extends AclMutator, AutoCloseable {
      * Create partitions on certain topics.
      *
      * @param topics        The list of topics to create partitions for.
+     * @param validateOnly  If true, the request is validated, but no partitions will be created.
+     *
      * @return              A future yielding per-topic results.
      */
     CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
         ControllerRequestContext context,
-        List<CreatePartitionsTopic> topics
+        List<CreatePartitionsTopic> topics,
+        boolean validateOnly
     );
 
     /**
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ff8ff78520..9ff6a59957 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1664,13 +1664,23 @@ public final class QuorumController implements Controller {
     @Override
     public CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
         ControllerRequestContext context,
-        List<CreatePartitionsTopic> topics
+        List<CreatePartitionsTopic> topics,
+        boolean validateOnly
     ) {
         if (topics.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
-        return appendWriteEvent("createPartitions", context.deadlineNs(),
-            () -> replicationControl.createPartitions(topics));
+
+        return appendWriteEvent("createPartitions", context.deadlineNs(), () -> {
+            final ControllerResult<List<CreatePartitionsTopicResult>> result = replicationControl.createPartitions(topics);
+            if (validateOnly) {
+                log.debug("Validate-only CreatePartitions result(s): {}", result.response());
+                return result.withoutRecords();
+            } else {
+                log.debug("CreatePartitions result(s): {}", result.response());
+                return result;
+            }
+        });
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index dfa3009ea2..f660938bd8 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1364,7 +1364,7 @@ public class ReplicationControlManager {
                 setErrorCode(apiError.error().code()).
                 setErrorMessage(apiError.message()));
         }
-        return new ControllerResult<>(records, results, true);
+        return ControllerResult.atomicOf(records, results);
     }
 
     void createPartitions(CreatePartitionsTopic topic,
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 9f569251de..9c679ad7d1 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -808,7 +808,7 @@ public class QuorumControllerTest {
                     controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID));
                 CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
                     controller.createPartitions(context0, Collections.singletonList(
-                        new CreatePartitionsTopic()));
+                        new CreatePartitionsTopic()), false);
                 CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
                     controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).
                         setTopicPartitions(null));
@@ -862,7 +862,7 @@ public class QuorumControllerTest {
                 CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
                     controller.findTopicNames(ANONYMOUS_CONTEXT, Collections.emptyList());
                 CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
-                    controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList());
+                    controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList(), false);
                 CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
                     controller.electLeaders(ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
                 CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =