You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/27 04:08:00 UTC

[jira] [Commented] (KAFKA-7382) We shoud guarantee at lest one replica of partition should be alive when create or update topic

    [ https://issues.apache.org/jira/browse/KAFKA-7382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699906#comment-16699906 ] 

ASF GitHub Bot commented on KAFKA-7382:
---------------------------------------

sumannewton closed pull request #5822: Fixed KAFKA-7382 - guarantee atleast one replica of partition to be alive during topic creation.
URL: https://github.com/apache/kafka/pull/5822
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 3ba6c1fecd9..e14aa9839f5 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -288,12 +288,7 @@ object TopicCommand extends Logging {
     val ret = new mutable.HashMap[Int, List[Int]]()
     for (i <- 0 until partitionList.size) {
       val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
-      val duplicateBrokers = CoreUtils.duplicates(brokerList)
-      if (duplicateBrokers.nonEmpty)
-        throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))
       ret.put(i, brokerList.toList)
-      if (ret(i).size != ret(0).size)
-        throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
     }
     ret.toMap
   }
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index 700a137c8de..8c83b38518d 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package kafka.utils
+package kafka.utils
 
 import joptsimple.{OptionSpec, OptionSet, OptionParser}
 import scala.collection.Set
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 14e7d7ef4ea..e787ebdcd79 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -53,7 +53,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
                   rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
     val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
     val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
-    createTopicWithAssignment(topic, topicConfig, replicaAssignment)
+    createTopicWithAssignment(topic, topicConfig, replicaAssignment, rackAwareMode)
   }
 
   /**
@@ -82,8 +82,9 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
 
   def createTopicWithAssignment(topic: String,
                                 config: Properties,
-                                partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = {
-    validateTopicCreate(topic, partitionReplicaAssignment, config)
+                                partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                rackAwareMode: RackAwareMode = RackAwareMode.Enforced): Unit = {
+    validateTopicCreate(topic, partitionReplicaAssignment, config, rackAwareMode)
 
     info(s"Creating topic $topic with configuration $config and initial partition " +
       s"assignment $partitionReplicaAssignment")
@@ -95,12 +96,24 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
     writeTopicPartitionAssignment(topic, partitionReplicaAssignment, isUpdate = false)
   }
 
+  def validatePartitionReplicaAssignment(topic: String,
+                                         partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                         rackAwareMode: RackAwareMode = RackAwareMode.Enforced): Unit = {
+    val assignmentPartition0 = partitionReplicaAssignment.getOrElse(0,
+      throw new AdminOperationException(
+        s"Unexpected replica assignment for topic '$topic', partition id 0 is missing. " +
+          s"Assignment: $partitionReplicaAssignment"))
+    val allBrokers = getBrokerMetadatas(rackAwareMode)
+    validateReplicaAssignment(partitionReplicaAssignment, assignmentPartition0.size, allBrokers.map(_.id).toSet)
+  }
+
   /**
    * Validate topic creation parameters
    */
   def validateTopicCreate(topic: String,
                           partitionReplicaAssignment: Map[Int, Seq[Int]],
-                          config: Properties): Unit = {
+                          config: Properties,
+                          rackAwareMode: RackAwareMode = RackAwareMode.Enforced): Unit = {
     Topic.validate(topic)
 
     if (zkClient.topicExists(topic))
@@ -117,13 +130,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
       }
     }
 
-    if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
-      throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
-
-    partitionReplicaAssignment.values.foreach(reps =>
-      if (reps.size != reps.toSet.size)
-        throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
-    )
+    //Validate partition replica assignment during topic creation.
+    validatePartitionReplicaAssignment(topic, partitionReplicaAssignment, rackAwareMode)
 
     LogConfig.validate(config)
   }
@@ -224,11 +232,13 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
         throw new InvalidReplicaAssignmentException(
           s"Duplicate brokers not allowed in replica assignment: " +
             s"${replicas.mkString(", ")} for partition id $partitionId.")
-      if (!replicas.toSet.subsetOf(availableBrokerIds))
+      // Topic creation should be interrupted if all the replicas are unavailable or not alive.
+      // Atleast one replica should be alive for topic creation.
+      if ((replicas.toSet -- availableBrokerIds).size == replicas.size)
         throw new BrokerNotAvailableException(
-          s"Some brokers specified for partition id $partitionId are not available. " +
+          s"All brokers specified for partition id $partitionId are not available. " +
             s"Specified brokers: ${replicas.mkString(", ")}, " +
-            s"available brokers: ${availableBrokerIds.mkString(", ")}.")
+            s"Available brokers: ${availableBrokerIds.mkString(", ")}.")
       partitionId -> replicas.size
     }
     val badRepFactors = replicaAssignment.collect {
diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
index d03ec04b367..4e6eacf76f1 100644
--- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
@@ -1,37 +1,45 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 package kafka.admin
 
 import java.util.Properties
 
+import kafka.admin.ReassignPartitionsCommand.Throttle
 import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
+import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient => JAdminClient}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert.assertEquals
 import org.junit.{After, Test}
 
-import scala.collection.{Map, Set}
+import scala.collection.{Map, Seq, Set}
 
 class PreferredReplicaElectionCommandTest extends ZooKeeperTestHarness with Logging {
   var servers: Seq[KafkaServer] = Seq()
+  var adminClient: JAdminClient = null
 
   @After
   override def tearDown() {
+    if (adminClient != null) {
+      adminClient.close()
+      adminClient = null
+    }
     TestUtils.shutdownServers(servers)
     super.tearDown()
   }
@@ -47,19 +55,38 @@ class PreferredReplicaElectionCommandTest extends ZooKeeperTestHarness with Logg
       partitionsUndergoingPreferredReplicaElection)
   }
 
+  def createAdminClient(servers: Seq[KafkaServer]): JAdminClient = {
+    val props = new Properties()
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
+    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+    JAdminClient.create(props)
+  }
+
+  def waitForReassignmentToComplete(pause: Long = 100L) {
+    waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+      s"Znode ${ReassignPartitionsZNode.path} wasn't deleted", pause = pause)
+  }
+
   @Test
   def testBasicPreferredReplicaElection() {
-    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
+    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
-    val partition = 1
-    val preferredReplica = 0
+    val partition = 0
+    val preferredReplica = 2
     // create brokers
     val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
-    // create the topic
-    adminZkClient.createTopicWithAssignment(topic, config = new Properties, expectedReplicaAssignment)
     servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
-    // broker 2 should be the leader since it was started first
+
+    // create the topic. Current leader will be 0.
+    adminZkClient.createTopicWithAssignment(topic, config = new Properties, expectedReplicaAssignment)
+
+    // Reassign the partition so that preferred replica election chooses 2 as new leader.
+    adminClient = createAdminClient(servers)
+    val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topic","partition":$partition,"replicas":[2, 1, 0]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, Throttle(-1, -1))
+    waitForReassignmentToComplete()
+
     val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None)
     // trigger preferred replica election
     val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, partition)))
diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
index facc7458333..79018b0eebf 100644
--- a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
@@ -27,6 +27,7 @@ trait RackAwareTest {
                                numPartitions: Int,
                                replicationFactor: Int,
                                verifyRackAware: Boolean = true,
+                               verifyRackUnAware: Boolean = false,
                                verifyLeaderDistribution: Boolean = true,
                                verifyReplicasDistribution: Boolean = true) {
     // always verify that no broker will be assigned for more than one replica
@@ -40,6 +41,11 @@ trait RackAwareTest {
       assertEquals("More than one replica of the same partition is assigned to the same rack",
         List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size))
     }
+    if (verifyRackUnAware) {
+      val partitionRackMap = distribution.partitionRacks
+      assertNotEquals("All replicas of all partitions are assigned to different racks",
+        List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size))
+    }
 
     if (verifyLeaderDistribution) {
       val leaderCount = distribution.brokerLeaderCount
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index f94abae0e7f..fc1960a6a25 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -527,6 +527,9 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
   def testResumePartitionReassignmentThatWasCompleted() {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
+
+    // create brokers
+    servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     adminZkClient.createTopicWithAssignment(topic, config = new Properties, expectedReplicaAssignment)
     // put the partition in the reassigned path as well
@@ -536,8 +539,6 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
     val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
     reassignPartitionsCommand.reassignPartitions()
-    // create brokers
-    servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
 
     // wait until reassignment completes
     TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index a469f8efe04..783d68c665d 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -14,19 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.admin
+package unit.kafka.admin
 
-import org.junit.Assert._
-import org.junit.Test
-import kafka.utils.Logging
-import kafka.utils.TestUtils
-import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
-import kafka.server.ConfigType
+import kafka.admin.{AdminOperationException, RackAwareTest, TopicCommand}
 import kafka.admin.TopicCommand.TopicCommandOptions
+import kafka.server.ConfigType
+import kafka.utils.{Logging, TestUtils}
 import kafka.utils.ZkUtils.getDeleteTopicPath
-import org.apache.kafka.common.errors.TopicExistsException
-import org.apache.kafka.common.internals.Topic
+import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.errors.{BrokerNotAvailableException, InvalidReplicaAssignmentException, TopicExistsException}
+import org.apache.kafka.common.internals.Topic
+import org.junit.Assert._
+import org.junit.Test
 
 class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
@@ -160,6 +160,95 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     TopicCommand.createTopic(zkClient, createNotExistsOpts)
   }
 
+  @Test
+  def testCreateWithAllAvailableBrokersReplicaAssignment() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+
+    val topic = "test"
+    val replicaAssignment = "0:1,1:2,2:0"
+
+    // create the topic
+    val createOpts = new TopicCommandOptions(Array( "--topic", topic, "--replica-assignment", replicaAssignment))
+    TopicCommand.createTopic(zkClient, createOpts)
+
+    // try to re-create the topic without --if-not-exists
+    intercept[TopicExistsException] {
+      TopicCommand.createTopic(zkClient, createOpts)
+    }
+
+    // try to re-create the topic with --if-not-exists
+    val createNotExistsOpts = new TopicCommandOptions(
+      Array("--replica-assignment", replicaAssignment, "--topic", topic, "--if-not-exists"))
+    TopicCommand.createTopic(zkClient, createNotExistsOpts)
+  }
+
+  @Test
+  def testCreateWithSomeAvailableBrokersReplicaAssignment() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+
+    val topic = "test"
+    val replicaAssignment = "0:1,3:2,2:3"
+
+    // create the topic
+    val createOpts = new TopicCommandOptions(Array( "--topic", topic, "--replica-assignment", replicaAssignment))
+    TopicCommand.createTopic(zkClient, createOpts)
+
+    // try to re-create the topic with --if-not-exists
+    val createNotExistsOpts = new TopicCommandOptions(
+      Array("--replica-assignment", replicaAssignment, "--topic", topic, "--if-not-exists"))
+    TopicCommand.createTopic(zkClient, createNotExistsOpts)
+  }
+
+  @Test
+  def testCreateWithNoAvailableBrokersReplicaAssignment() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+
+    val topic = "test"
+    val replicaAssignment = "0:1,3:2,3:4"
+
+    // create the topic
+    val createOpts = new TopicCommandOptions(Array( "--topic", topic, "--replica-assignment", replicaAssignment))
+    intercept[BrokerNotAvailableException] {
+      TopicCommand.createTopic(zkClient, createOpts)
+    }
+
+    // try to re-create the topic with --if-not-exists
+    val createNotExistsOpts = new TopicCommandOptions(
+      Array("--replica-assignment", replicaAssignment, "--topic", topic, "--if-not-exists"))
+    intercept[BrokerNotAvailableException] {
+      TopicCommand.createTopic(zkClient, createNotExistsOpts)
+    }
+  }
+
+  @Test
+  def testCreateWithDifferentRFReplicaAssignment() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+
+    val topic = "test"
+    val replicaAssignment = "0:1,1:2,2:0:1"
+
+    // create the topic
+    val createOpts = new TopicCommandOptions(Array( "--topic", topic, "--replica-assignment", replicaAssignment))
+    intercept[InvalidReplicaAssignmentException] {
+      TopicCommand.createTopic(zkClient, createOpts)
+    }
+
+    // try to re-create the topic with --if-not-exists
+    val createNotExistsOpts = new TopicCommandOptions(
+      Array("--replica-assignment", replicaAssignment, "--topic", topic, "--if-not-exists"))
+    intercept[InvalidReplicaAssignmentException] {
+      TopicCommand.createTopic(zkClient, createNotExistsOpts)
+    }
+  }
+
   @Test
   def testCreateAlterTopicWithRackAware() {
     val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
@@ -190,6 +279,40 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
   }
 
+  @Test
+  def testCreateAlterTopicWithRackAwareDisabled() {
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
+    TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient)
+
+    val numPartitions = 18
+    val replicationFactor = 3
+    val createOpts = new TopicCommandOptions(Array(
+      "--partitions", numPartitions.toString,
+      "--replication-factor", replicationFactor.toString,
+      "--topic", "foo",
+      "--disable-rack-aware"))
+    TopicCommand.createTopic(zkClient, createOpts)
+
+    var assignment = zkClient.getReplicaAssignmentForTopics(Set("foo")).map { case (tp, replicas) =>
+      tp.partition -> replicas
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor,
+      verifyRackAware = false, verifyRackUnAware = true)
+
+    val alteredNumPartitions = 36
+    // verify that adding partitions will also be rack aware
+    val alterOpts = new TopicCommandOptions(Array(
+      "--partitions", alteredNumPartitions.toString,
+      "--topic", "foo",
+      "--disable-rack-aware"))
+    TopicCommand.alterTopic(zkClient, alterOpts)
+    assignment = zkClient.getReplicaAssignmentForTopics(Set("foo")).map { case (tp, replicas) =>
+      tp.partition -> replicas
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor,
+      verifyRackAware = false, verifyRackUnAware = true)
+  }
+
   @Test
   def testDescribeAndListTopicsMarkedForDeletion() {
     val brokers = List(0)
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index bece78b5eda..f7be87088a0 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -305,7 +305,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
 
   @Test
   def testControlledShutdown() {
-    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
+    val expectedReplicaAssignment = Map(0  -> List(2, 0, 1), 1  -> List(0, 1, 2))
     val topic = "test"
     val partition = 1
     // create brokers
@@ -343,7 +343,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
     controller.controlledShutdown(0, controlledShutdownCallback)
     partitionsRemaining = resultQueue.take().get
-    assertEquals(1, partitionsRemaining.size)
+    assertEquals(expectedReplicaAssignment.size, partitionsRemaining.size)
     // leader doesn't change since all the replicas are shut down
     assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> We shoud guarantee at lest one replica of partition should be alive when create or update topic
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7382
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7382
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.2.0
>            Reporter: zhaoshijie
>            Priority: Major
>
> For example:I have brokers: 1,2,3,4,5. I create a new topic by command: 
> {code:java}
> sh kafka-topics.sh --create --topic replicaserror --zookeeper localhost:2181 --replica-assignment 11:12:13,12:13:14,14:15:11,14:12:11,13:14:11
> {code}
> Then kafkaController will process this,after partitionStateMachine and replicaStateMachine handle state change,topic metadatas and state will be strange,partitions is on NewPartition and replicas is on OnlineReplica. 
> Next wo can not delete this topic(bacase state change illegal ),This will cause a number of problems.So i think wo shoud check replicas assignment when create or update topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)