You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/02 22:39:56 UTC

[kafka] branch trunk updated: KAFKA-14129: KRaft must check manual assignments for createTopics are contiguous (#12467)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0c4da23098 KAFKA-14129: KRaft must check manual assignments for createTopics are contiguous (#12467)
0c4da23098 is described below

commit 0c4da23098f8b8ae9542acd7fbaa1e5c16384a39
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Tue Aug 2 15:39:47 2022 -0700

    KAFKA-14129: KRaft must check manual assignments for createTopics are contiguous (#12467)
    
    KRaft should validate that manual assignments given to createTopics are contiguous. In other words,
    they must start with partition 0, and progress through 1, 2, 3, etc. ZK mode does this, but KRaft
    mode previously did not. Also fix a null pointer exception when the placement for partition 0
    was not specified.
    
    Convert over AddPartitionsTest to use KRaft. This PR converts all of the test except for some of
    the placement logic tests, which will need to be redone for KRaft mode in a future change.
    
    Fix null pointer exception in KRaftMetadataCache#getPartitionInfo.  Specifically, we should not
    assume that the partition will be found in the hash map. This is another case where we had
    "Some(x)" but it should be "Option(x)."
    
    Fix a potential null pointer exception in BrokerServer#state.
    
    Reviewers: dengziming <de...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   5 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |   2 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 164 ++++++++++++++-------
 .../controller/ReplicationControlManager.java      |   8 +-
 4 files changed, 123 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index eb21c1ed25..0bdd673497 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -81,7 +81,8 @@ class BrokerServer(
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
 ) extends KafkaBroker {
 
-  override def brokerState: BrokerState = lifecycleManager.state
+  override def brokerState: BrokerState = Option(lifecycleManager).
+    flatMap(m => Some(m.state)).getOrElse(BrokerState.NOT_RUNNING)
 
   import kafka.server.Server._
 
@@ -89,7 +90,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  @volatile private var lifecycleManager: BrokerLifecycleManager = null
+  @volatile var lifecycleManager: BrokerLifecycleManager = null
 
   private val isShuttingDown = new AtomicBoolean(false)
 
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index ae2e652357..5257721150 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -229,7 +229,7 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
 
   override def getPartitionInfo(topicName: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
     Option(_currentImage.topics().getTopic(topicName)).
-      flatMap(topic => Some(topic.partitions().get(partitionId))).
+      flatMap(topic => Option(topic.partitions().get(partitionId))).
       flatMap(partition => Some(new UpdateMetadataPartitionState().
         setTopicName(topicName).
         setPartitionIndex(partitionId).
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index ea4215d9c3..4e2bfee60e 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,18 +17,24 @@
 
 package kafka.admin
 
-import java.util.Optional
+import java.util.{Collections, Optional}
 import kafka.controller.ReplicaAssignment
-import kafka.server.BaseRequestTest
-import kafka.utils.TestUtils
+import kafka.server.{BaseRequestTest, BrokerServer}
+import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.TestUtils._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
 import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections.singletonList
+import java.util.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
 
 class AddPartitionsTest extends BaseRequestTest {
@@ -47,44 +53,97 @@ class AddPartitionsTest extends BaseRequestTest {
   val topic4Assignment = Map(0 -> ReplicaAssignment(Seq(0,3), List(), List()))
   val topic5 = "new-topic5"
   val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
+  var admin: Admin = null
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
 
+    if (isKRaftTest()) {
+      brokers.foreach(broker => broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
+    }
     createTopicWithAssignment(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic3, partitionReplicaAssignment = topic3Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic4, partitionReplicaAssignment = topic4Assignment.map { case (k, v) => k -> v.replicas })
+    admin = createAdminClient()
   }
 
-  @Test
-  def testWrongReplicaCount(): Unit = {
-    assertThrows(classOf[InvalidReplicaAssignmentException], () => adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 2,
-      Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2)))))
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWrongReplicaCount(quorum: String): Unit = {
+    assertEquals(classOf[InvalidReplicaAssignmentException], assertThrows(classOf[ExecutionException], () => {
+        admin.createPartitions(Collections.singletonMap(topic1,
+          NewPartitions.increaseTo(2, singletonList(asList(0, 1, 2))))).all().get()
+      }).getCause.getClass)
   }
 
-  @Test
-  def testMissingPartition0(): Unit = {
-    val e = assertThrows(classOf[AdminOperationException], () => adminZkClient.addPartitions(topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
-      Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
-    assertTrue(e.getMessage.contains("Unexpected existing replica assignment for topic 'new-topic5', partition id 0 is missing"))
+  /**
+   * Test that when we supply a manual partition assignment to createTopics, it must be 0-based
+   * and consecutive.
+   */
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMissingPartitionsInCreateTopics(quorum: String): Unit = {
+    val topic6Placements = new util.HashMap[Integer, util.List[Integer]]
+    topic6Placements.put(1, asList(0, 1))
+    topic6Placements.put(2, asList(1, 0))
+    val topic7Placements = new util.HashMap[Integer, util.List[Integer]]
+    topic7Placements.put(2, asList(0, 1))
+    topic7Placements.put(3, asList(1, 0))
+    val futures = admin.createTopics(asList(
+      new NewTopic("new-topic6", topic6Placements),
+      new NewTopic("new-topic7", topic7Placements))).values()
+    val topic6Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic6").get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], topic6Cause.getClass)
+    assertTrue(topic6Cause.getMessage.contains("partitions should be a consecutive 0-based integer sequence"),
+      "Unexpected error message: " + topic6Cause.getMessage)
+    val topic7Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic7").get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], topic7Cause.getClass)
+    assertTrue(topic7Cause.getMessage.contains("partitions should be a consecutive 0-based integer sequence"),
+      "Unexpected error message: " + topic7Cause.getMessage)
   }
 
-  @Test
-  def testIncrementPartitions(): Unit = {
-    adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 3)
+  /**
+   * Test that when we supply a manual partition assignment to createPartitions, it must contain
+   * enough partitions.
+   */
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMissingPartitionsInCreatePartitions(quorum: String): Unit = {
+    val cause = assertThrows(classOf[ExecutionException], () =>
+      admin.createPartitions(Collections.singletonMap(topic1,
+        NewPartitions.increaseTo(3, singletonList(asList(0, 1, 2))))).all().get()).getCause
+    assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
+    if (isKRaftTest()) {
+      assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " +
+        "were specified."), "Unexpected error message: " + cause.getMessage)
+    } else {
+      assertTrue(cause.getMessage.contains("Increasing the number of partitions by 2 but 1 assignments provided."),
+        "Unexpected error message: " + cause.getMessage)
+    }
+    if (!isKRaftTest()) {
+      // In ZK mode, test the raw AdminZkClient method as well.
+      val e = assertThrows(classOf[AdminOperationException], () => adminZkClient.addPartitions(
+        topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
+        Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
+      assertTrue(e.getMessage.contains("Unexpected existing replica assignment for topic 'new-topic5', partition " +
+        "id 0 is missing"))
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncrementPartitions(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic1, NewPartitions.increaseTo(3))).all().get()
+
     // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1)
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2)
-    val leader1FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic1, 1)).get
-    val leader2FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic1, 2)).get
-    assertEquals(leader1, leader1FromZk)
-    assertEquals(leader2, leader2FromZk)
+    waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1)
+    waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 2)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic1, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic1, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic1, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic1, 2)
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic1).asJava, false).build)
     assertEquals(1, response.topicMetadata.size)
@@ -102,22 +161,21 @@ class AddPartitionsTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testManualAssignmentOfReplicas(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testManualAssignmentOfReplicas(quorum: String): Unit = {
     // Add 2 partitions
-    adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3,
-      Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
+    admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3,
+      asList(asList(0, 1), asList(2, 3))))).all().get()
     // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1)
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2)
-    val leader1FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic2, 1)).get
-    val leader2FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic2, 2)).get
-    assertEquals(leader1, leader1FromZk)
-    assertEquals(leader2, leader2FromZk)
+    val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1)
+    val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic2, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic2, 2)
+    val partition1Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
+    assertEquals(leader1, partition1Metadata.leader())
+    val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
+    assertEquals(leader2, partition2Metadata.leader())
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
     assertEquals(1, response.topicMetadata.size)
@@ -132,17 +190,18 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(Set(0, 1), replicas.asScala.toSet)
   }
 
-  @Test
-  def testReplicaPlacementAllServers(): Unit = {
-    adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO: add kraft support
+  def testReplicaPlacementAllServers(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get()
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic3, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 2)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 3)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 4)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 5)
-    TestUtils.waitForPartitionMetadata(servers, topic3, 6)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 3)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 4)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 5)
+    TestUtils.waitForPartitionMetadata(brokers, topic3, 6)
 
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
@@ -157,13 +216,14 @@ class AddPartitionsTest extends BaseRequestTest {
     validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
   }
 
-  @Test
-  def testReplicaPlacementPartialServers(): Unit = {
-    adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO: add kraft support
+  def testReplicaPlacementPartialServers(quorum: String): Unit = {
+    admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get()
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitForPartitionMetadata(servers, topic2, 1)
-    TestUtils.waitForPartitionMetadata(servers, topic2, 2)
+    TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
+    TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
 
     val response = connectAndReceive[MetadataResponse](
       new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 3a3788c41a..bf3a679d2c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -666,6 +666,12 @@ public class ReplicationControlManager {
                     Replicas.toArray(assignment.brokerIds()), Replicas.toArray(isr),
                     Replicas.NONE, Replicas.NONE, isr.get(0), LeaderRecoveryState.RECOVERED, 0, 0));
             }
+            for (int i = 0; i < newParts.size(); i++) {
+                if (!newParts.containsKey(i)) {
+                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                            "partitions should be a consecutive 0-based integer sequence");
+                }
+            }
             ApiError error = maybeCheckCreateTopicPolicy(() -> {
                 Map<Integer, List<Integer>> assignments = new HashMap<>();
                 newParts.entrySet().forEach(e -> assignments.put(e.getKey(),
@@ -744,7 +750,7 @@ public class ReplicationControlManager {
                     setIsSensitive(entry.isSensitive()));
             }
             result.setNumPartitions(newParts.size());
-            result.setReplicationFactor((short) newParts.get(0).replicas.length);
+            result.setReplicationFactor((short) newParts.values().iterator().next().replicas.length);
             result.setTopicConfigErrorCode(NONE.code());
         } else {
             result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());