You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/04 17:32:18 UTC
[kafka] branch trunk updated: KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in KRaft mode (#12106)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 430f75ba22 KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in KRaft mode (#12106)
430f75ba22 is described below
commit 430f75ba227b8f600ee7fafc2b83e2e97c38f03b
Author: Akhilesh Chaganti <ak...@users.noreply.github.com>
AuthorDate: Wed May 4 10:31:46 2022 -0700
KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in KRaft mode (#12106)
The KRaft implementation of the `CreatePartitions` ignores the `validateOnly` flag in the
request and creates the partitions if the validations are successful. Fixed the behavior
not to create partitions upon validation if the `validateOnly` flag is true.
Reviewers: Divij Vaidya <di...@gmail.com>, dengziming <de...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/server/ControllerApis.scala | 2 +-
core/src/test/java/kafka/test/MockController.java | 3 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 119 ++++++++++++++++-----
.../unit/kafka/server/ControllerApisTest.scala | 32 ++++--
.../org/apache/kafka/controller/Controller.java | 5 +-
.../apache/kafka/controller/QuorumController.java | 16 ++-
.../controller/ReplicationControlManager.java | 2 +-
.../kafka/controller/QuorumControllerTest.java | 4 +-
8 files changed, 139 insertions(+), 44 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 3bd4ca5fce..0aaab0aefb 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -767,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel,
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
}
}
- controller.createPartitions(context, topics).thenApply { results =>
+ controller.createPartitions(context, topics, request.validateOnly).thenApply { results =>
results.forEach(response => responses.add(response))
responses
}
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
index f6bfd207d4..ff1154d211 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -413,7 +413,8 @@ public class MockController implements Controller {
@Override
synchronized public CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
ControllerRequestContext context,
- List<CreatePartitionsTopic> topicList
+ List<CreatePartitionsTopic> topicList,
+ boolean validateOnly
) {
if (!active) {
CompletableFuture<List<CreatePartitionsTopicResult>> future = new CompletableFuture<>();
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index b095a6170c..81e9f692a8 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -25,12 +25,11 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Optional, Properties}
import java.{time, util}
-
import kafka.log.LogConfig
import kafka.security.authorizer.AclEntry
import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
-import kafka.utils.{Log4jController, TestUtils}
+import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -425,8 +424,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
checkValidAlterConfigs(client, topicResource1, topicResource2)
}
- @Test
- def testCreatePartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreatePartitions(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
@@ -490,7 +490,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when newCount is a decrease")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic currently has 3 partitions, which is higher than the requested 1.", e.getCause.getMessage, desc)
+ var exceptionMsgStr = if (isKRaftTest()) {
+ "The topic create-partitions-topic-1 currently has 3 partition(s); 1 would not be an increase."
+ } else {
+ "Topic currently has 3 partitions, which is higher than the requested 1."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try a newCount which would be a noop (without assignment)
@@ -499,7 +504,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get,
() => s"$desc: Expect InvalidPartitionsException when requesting a noop")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "Topic already has 3 partition(s)."
+ } else {
+ "Topic already has 3 partitions."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic2, Some(3)), desc)
// try a newCount which would be a noop (where the assignment matches current state)
@@ -507,7 +517,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get)
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc)
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic2, Some(3)), desc)
// try a newCount which would be a noop (where the assignment doesn't match current state)
@@ -515,7 +525,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
NewPartitions.increaseTo(3, newPartition2Assignments.asScala.reverse.toList.asJava)).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get)
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc)
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic2, Some(3)), desc)
// try a bad topic name
@@ -525,7 +535,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(unknownTopic).get,
() => s"$desc: Expect InvalidTopicException when using an unknown topic")
assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], desc)
- assertEquals("The topic 'an-unknown-topic' does not exist.", e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "This server does not host this topic-partition."
+ } else {
+ "The topic 'an-unknown-topic' does not exist."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
// try an invalid newCount
alterResult = client.createPartitions(Map(topic1 ->
@@ -533,7 +548,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when newCount is invalid")
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
- assertEquals("Topic currently has 3 partitions, which is higher than the requested -22.", e.getCause.getMessage,
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The topic create-partitions-topic-1 currently has 3 partition(s); -22 would not be an increase."
+ } else {
+ "Topic currently has 3 partitions, which is higher than the requested -22."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage,
desc)
assertEquals(3, numPartitions(topic1), desc)
@@ -543,9 +563,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidPartitionsException when #brokers != replication factor")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
- assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " +
- "while partitions [3] have replication factors [2], respectively.",
- e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The manual partition assignment includes a partition with 2 replica(s), but this is not " +
+ "consistent with previous partitions, which have 1 replica(s)."
+ } else {
+ "Inconsistent replication factor between partitions, partition 0 has 1 while partitions [3] " +
+ "have replication factors [2], respectively."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try #assignments < with the increase
@@ -554,7 +579,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
- assertEquals("Increasing the number of partitions by 3 but 1 assignments provided.", e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "Attempted to add 3 additional partition(s), but only 1 assignment(s) were specified."
+ } else {
+ "Increasing the number of partitions by 3 but 1 assignments provided."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try #assignments > with the increase
@@ -562,8 +592,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, option)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount")
+ exceptionMsgStr = if (isKRaftTest()) {
+ "Attempted to add 1 additional partition(s), but only 2 assignment(s) were specified."
+ } else {
+ "Increasing the number of partitions by 1 but 2 assignments provided."
+ }
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
- assertEquals("Increasing the number of partitions by 1 but 2 assignments provided.", e.getCause.getMessage, desc)
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try with duplicate brokers in assignments
@@ -572,8 +607,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments has duplicate brokers")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
- assertEquals("Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3.",
- e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The manual partition assignment includes the broker 1 more than once."
+ } else {
+ "Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try assignments with differently sized inner lists
@@ -582,8 +621,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
- assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " +
- "while partitions [4] have replication factors [2], respectively.", e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The manual partition assignment includes a partition with 2 replica(s), but this is not " +
+ "consistent with previous partitions, which have 1 replica(s)."
+ } else {
+ "Inconsistent replication factor between partitions, partition 0 has 1 " +
+ "while partitions [4] have replication factors [2], respectively."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try assignments with unknown brokers
@@ -592,7 +637,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments contains an unknown broker")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
- assertEquals("Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "The manual partition assignment includes broker 12, but no such broker is registered."
+ } else {
+ "Unknown broker(s) in replica assignment: 12."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
// try with empty assignments
@@ -601,7 +651,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
() => s"$desc: Expect InvalidReplicaAssignmentException when assignments is empty")
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc)
- assertEquals("Increasing the number of partitions by 1 but 0 assignments provided.", e.getCause.getMessage, desc)
+ exceptionMsgStr = if (isKRaftTest()) {
+ "Attempted to add 1 additional partition(s), but only 0 assignment(s) were specified."
+ } else {
+ "Increasing the number of partitions by 1 but 0 assignments provided."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage, desc)
assertEquals(3, numPartitions(topic1), desc)
}
@@ -614,18 +669,30 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out waiting for new partitions to appear")
var e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get)
assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
- assertEquals("Topic currently has 3 partitions, which is higher than the requested 2.", e.getCause.getMessage)
+ val exceptionMsgStr = if (isKRaftTest()) {
+ "The topic create-partitions-topic-2 currently has 3 partition(s); 2 would not be an increase."
+ } else {
+ "Topic currently has 3 partitions, which is higher than the requested 2."
+ }
+ assertEquals(exceptionMsgStr, e.getCause.getMessage)
assertEquals(3, numPartitions(topic2))
- // finally, try to add partitions to a topic queued for deletion
+ // Delete the topic. Verify addition of partitions to deleted topic is not possible. In
+ // Zookeeper mode, the topic is queued for deletion. In KRaft, the deletion occurs
+ // immediately and hence we have a different Exception thrown in the response.
val deleteResult = client.deleteTopics(asList(topic1))
deleteResult.topicNameValues.get(topic1).get
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(4)).asJava, validateOnly)
e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic1).get,
- () => "Expect InvalidTopicException when the topic is queued for deletion")
- assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
- assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+ () => "Expect InvalidTopicException or UnknownTopicOrPartitionException when the topic is queued for deletion")
+ if (isKRaftTest()) {
+ assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException], e.toString)
+ assertEquals("This server does not host this topic-partition.", e.getCause.getMessage)
+ } else {
+ assertTrue(e.getCause.isInstanceOf[InvalidTopicException], e.toString)
+ assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
+ }
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index d28ee38db6..9f0ae482c5 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -20,9 +20,8 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.Collections.singletonList
-import java.util.Properties
+import java.util.{Collections, Properties}
import java.util.concurrent.{CompletableFuture, ExecutionException}
-
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
@@ -59,6 +58,8 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, A
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@@ -717,11 +718,10 @@ class ControllerApisTest {
_ => Set("foo", "bar")))
}
- @Test
- def testCreatePartitionsRequest(): Unit = {
- val controller = new MockController.Builder().
- newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
- newInitialTopic("bar", Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")).build()
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testCreatePartitionsRequest(validateOnly: Boolean): Unit = {
+ val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
val request = new CreatePartitionsRequestData()
request.topics().add(new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
@@ -729,9 +729,23 @@ class ControllerApisTest {
request.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
request.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
request.topics().add(new CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
+ request.setValidateOnly(validateOnly)
+
+ // Check if the controller is called correctly with the 'validateOnly' field set appropriately.
+ when(controller.createPartitions(
+ any(),
+ ArgumentMatchers.eq(
+ Collections.singletonList(
+ new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))),
+ ArgumentMatchers.eq(validateOnly))).thenReturn(CompletableFuture
+ .completedFuture(Collections.singletonList(
+ new CreatePartitionsTopicResult().setName("foo").
+ setErrorCode(NONE.code()).
+ setErrorMessage(null)
+ )))
assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
- setErrorCode(NONE.code()).
- setErrorMessage(null),
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
new CreatePartitionsTopicResult().setName("bar").
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate topic name."),
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 6da8fb5eda..3622fe225d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -328,11 +328,14 @@ public interface Controller extends AclMutator, AutoCloseable {
* Create partitions on certain topics.
*
* @param topics The list of topics to create partitions for.
+ * @param validateOnly If true, the request is validated, but no partitions will be created.
+ *
* @return A future yielding per-topic results.
*/
CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
ControllerRequestContext context,
- List<CreatePartitionsTopic> topics
+ List<CreatePartitionsTopic> topics,
+ boolean validateOnly
);
/**
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ff8ff78520..9ff6a59957 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1664,13 +1664,23 @@ public final class QuorumController implements Controller {
@Override
public CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
ControllerRequestContext context,
- List<CreatePartitionsTopic> topics
+ List<CreatePartitionsTopic> topics,
+ boolean validateOnly
) {
if (topics.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
- return appendWriteEvent("createPartitions", context.deadlineNs(),
- () -> replicationControl.createPartitions(topics));
+
+ return appendWriteEvent("createPartitions", context.deadlineNs(), () -> {
+ final ControllerResult<List<CreatePartitionsTopicResult>> result = replicationControl.createPartitions(topics);
+ if (validateOnly) {
+ log.debug("Validate-only CreatePartitions result(s): {}", result.response());
+ return result.withoutRecords();
+ } else {
+ log.debug("CreatePartitions result(s): {}", result.response());
+ return result;
+ }
+ });
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index dfa3009ea2..f660938bd8 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1364,7 +1364,7 @@ public class ReplicationControlManager {
setErrorCode(apiError.error().code()).
setErrorMessage(apiError.message()));
}
- return new ControllerResult<>(records, results, true);
+ return ControllerResult.atomicOf(records, results);
}
void createPartitions(CreatePartitionsTopic topic,
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 9f569251de..9c679ad7d1 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -808,7 +808,7 @@ public class QuorumControllerTest {
controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID));
CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
controller.createPartitions(context0, Collections.singletonList(
- new CreatePartitionsTopic()));
+ new CreatePartitionsTopic()), false);
CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).
setTopicPartitions(null));
@@ -862,7 +862,7 @@ public class QuorumControllerTest {
CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
controller.findTopicNames(ANONYMOUS_CONTEXT, Collections.emptyList());
CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
- controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList());
+ controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList(), false);
CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
controller.electLeaders(ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =