You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/15 18:03:10 UTC

[1/2] kafka git commit: KAFKA-1215; Rack-Aware replica assignment option

Repository: kafka
Updated Branches:
  refs/heads/trunk deb2b004c -> 951e30adc


http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7c2577c..8910e09 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -33,20 +33,20 @@ import TestUtils._
 
 import scala.collection.{Map, immutable}
 
-class AdminTest extends ZooKeeperTestHarness with Logging {
+class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
   @Test
   def testReplicaAssignment() {
-    val brokerList = List(0, 1, 2, 3, 4)
+    val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
 
     // test 0 replication factor
     intercept[AdminOperationException] {
-      AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
+      AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0)
     }
 
     // test wrong replication factor
     intercept[AdminOperationException] {
-      AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
+      AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 6)
     }
 
     // correct assignment
@@ -62,9 +62,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
         8 -> List(3, 0, 1),
         9 -> List(4, 1, 2))
 
-    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
-    val e = (expectedAssignment.toList == actualAssignment.toList)
-    assertTrue(expectedAssignment.toList == actualAssignment.toList)
+    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0)
+    assertEquals(expectedAssignment, actualAssignment)
   }
 
   @Test
@@ -314,7 +313,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     val partition = 1
     val preferredReplica = 0
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
+    val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
+    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
@@ -452,4 +452,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
       server.config.logDirs.foreach(CoreUtils.rm(_))
     }
   }
+
+  @Test
+  def testGetBrokerMetadatas() {
+    // broker 4 has no rack information
+    val brokerList = 0 to 5
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3")
+    val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet))
+    TestUtils.createBrokersInZk(brokerMetadatas, zkUtils)
+
+    val processedMetadatas1 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Disabled)
+    assertEquals(brokerList, processedMetadatas1.map(_.id))
+    assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack))
+
+    val processedMetadatas2 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Safe)
+    assertEquals(brokerList, processedMetadatas2.map(_.id))
+    assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack))
+
+    intercept[AdminOperationException] {
+      AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+    }
+
+    val partialList = List(0, 1, 2, 3, 5)
+    val processedMetadatas3 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced, Some(partialList))
+    assertEquals(partialList, processedMetadatas3.map(_.id))
+    assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack))
+
+    val numPartitions = 3
+    AdminUtils.createTopic(zkUtils, "foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe)
+    val assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo"))
+    assertEquals(numPartitions, assignment.size)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
new file mode 100644
index 0000000..facc745
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import scala.collection.{Map, Seq, mutable}
+import org.junit.Assert._
+
+trait RackAwareTest {
+
+  def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
+                               brokerRackMapping: Map[Int, String],
+                               numBrokers: Int,
+                               numPartitions: Int,
+                               replicationFactor: Int,
+                               verifyRackAware: Boolean = true,
+                               verifyLeaderDistribution: Boolean = true,
+                               verifyReplicasDistribution: Boolean = true) {
+    // always verify that no broker will be assigned for more than one replica
+    for ((_, brokerList) <- assignment) {
+      assertEquals("More than one replica is assigned to same broker for the same partition", brokerList.toSet.size, brokerList.size)
+    }
+    val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+
+    if (verifyRackAware) {
+      val partitionRackMap = distribution.partitionRacks
+      assertEquals("More than one replica of the same partition is assigned to the same rack",
+        List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size))
+    }
+
+    if (verifyLeaderDistribution) {
+      val leaderCount = distribution.brokerLeaderCount
+      val leaderCountPerBroker = numPartitions / numBrokers
+      assertEquals("Preferred leader count is not even for brokers", List.fill(numBrokers)(leaderCountPerBroker), leaderCount.values.toList)
+    }
+
+    if (verifyReplicasDistribution) {
+      val replicasCount = distribution.brokerReplicasCount
+      val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers
+      assertEquals("Replica count is not even for broker", List.fill(numBrokers)(numReplicasPerBroker), replicasCount.values.toList)
+    }
+  }
+
+  def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int, String]): ReplicaDistributions = {
+    val leaderCount = mutable.Map[Int, Int]()
+    val partitionCount = mutable.Map[Int, Int]()
+    val partitionRackMap = mutable.Map[Int, List[String]]()
+    assignment.foreach { case (partitionId, replicaList) =>
+      val leader = replicaList.head
+      leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1
+      for (brokerId <- replicaList) {
+        partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1
+        val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for $brokerId in `brokerRackMapping`"))
+        partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List())
+      }
+    }
+    ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
+  }
+
+  def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): Seq[BrokerMetadata] =
+    rackMap.toSeq.map { case (brokerId, rack) =>
+      BrokerMetadata(brokerId, Some(rack))
+    } ++ brokersWithoutRack.map { brokerId =>
+      BrokerMetadata(brokerId, None)
+    }.sortBy(_.id)
+
+}
+
+case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
new file mode 100644
index 0000000..0f71a19
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Test
+
+class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
+
+  @Test
+  def testRackAwareReassign() {
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
+    TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
+
+    val numPartitions = 18
+    val replicationFactor = 3
+
+    // create a non rack aware assignment topic first
+    val createOpts = new kafka.admin.TopicCommand.TopicCommandOptions(Array(
+      "--partitions", numPartitions.toString,
+      "--replication-factor", replicationFactor.toString,
+      "--disable-rack-aware",
+      "--topic", "foo"))
+    kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
+
+    val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
+    val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+      rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
+
+    val assignment = proposedAssignment map { case (topicPartition, replicas) =>
+      (topicPartition.partition, replicas)
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index d554b02..b42aaf4 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -27,7 +27,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils._
 import kafka.coordinator.GroupCoordinator
 
-class TopicCommandTest extends ZooKeeperTestHarness with Logging {
+class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
   @Test
   def testConfigPreservationAcrossPartitionAlteration() {
@@ -157,4 +157,34 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
       Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic, "--if-not-exists"))
     TopicCommand.createTopic(zkUtils, createNotExistsOpts)
   }
+
+  @Test
+  def testCreateAlterTopicWithRackAware() {
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
+    TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
+
+    val numPartitions = 18
+    val replicationFactor = 3
+    val createOpts = new TopicCommandOptions(Array(
+      "--partitions", numPartitions.toString,
+      "--replication-factor", replicationFactor.toString,
+      "--topic", "foo"))
+    TopicCommand.createTopic(zkUtils, createOpts)
+
+    var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
+      tp.partition -> replicas
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
+
+    val alteredNumPartitions = 36
+    // verify that adding partitions will also be rack aware
+    val alterOpts = new TopicCommandOptions(Array(
+      "--partitions", alteredNumPartitions.toString,
+      "--topic", "foo"))
+    TopicCommand.alterTopic(zkUtils, alterOpts)
+    assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
+      tp.partition -> replicas
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 905612c..400d6d6 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -28,20 +28,6 @@ import scala.collection.mutable
 class BrokerEndPointTest extends Logging {
 
   @Test
-  def testSerDe() {
-
-    val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
-    val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint)
-    val origBroker = new Broker(1, listEndPoints)
-    val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes)
-
-    origBroker.writeTo(brokerBytes)
-
-    val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer])
-    assert(origBroker == newBroker)
-  }
-
-  @Test
   def testHashAndEquals() {
     val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
     val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 7524e6a..fa240d2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -530,7 +530,7 @@ class KafkaConfigTest {
         case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
         case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
         case KafkaConfig.MetricReporterClassesProp => // ignore string
-
+        case KafkaConfig.RackProp => // ignore string
         //SSL Configs
         case KafkaConfig.PrincipalBuilderClassProp =>
         case KafkaConfig.SslProtocolProp => // ignore string

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2523083..49fb85f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -27,29 +27,27 @@ import java.security.cert.X509Certificate
 import javax.net.ssl.X509TrustManager
 import charset.Charset
 
-import kafka.security.auth.{Resource, Authorizer, Acl}
+import kafka.security.auth.{Acl, Authorizer, Resource}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.TestSslUtils
 
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-
 import kafka.server._
 import kafka.producer._
 import kafka.message._
 import kafka.api._
-import kafka.cluster.Broker
-import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig}
-import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
+import kafka.cluster.{Broker, EndPoint}
+import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
+import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
 import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
 import kafka.producer.ProducerConfig
 import kafka.log._
 import kafka.utils.ZkUtils._
-
 import org.junit.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.network.Mode
 
@@ -154,11 +152,12 @@ object TestUtils extends Logging {
     enablePlaintext: Boolean = true,
     enableSsl: Boolean = false,
     enableSaslPlaintext: Boolean = false,
-    enableSaslSsl: Boolean = false): Seq[Properties] = {
+    enableSaslSsl: Boolean = false,
+    rackInfo: Map[Int, String] = Map()): Seq[Properties] = {
     (0 until numConfigs).map { node =>
       createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort,
         interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = enablePlaintext, enableSsl = enableSsl,
-        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl)
+        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node))
     }
   }
 
@@ -180,7 +179,7 @@ object TestUtils extends Logging {
     enablePlaintext: Boolean = true,
     enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
     enableSsl: Boolean = false, sslPort: Int = RandomPort,
-    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort)
+    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] = None)
   : Properties = {
 
     def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol)
@@ -210,6 +209,7 @@ object TestUtils extends Logging {
     props.put("delete.topic.enable", enableDeleteTopic.toString)
     props.put("controlled.shutdown.retry.backoff.ms", "100")
     props.put("log.cleaner.dedupe.buffer.size", "2097152")
+    rack.foreach(props.put("broker.rack", _))
 
     if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })
       props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
@@ -591,9 +591,16 @@ object TestUtils extends Logging {
     }
   }
 
-  def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
-    brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1))
+  def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] =
+    createBrokersInZk(ids.map(kafka.admin.BrokerMetadata(_, None)), zkUtils)
+
+  def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata], zkUtils: ZkUtils): Seq[Broker] = {
+    val brokers = brokerMetadatas.map { b =>
+      val protocol = SecurityProtocol.PLAINTEXT
+      Broker(b.id, Map(protocol -> EndPoint("localhost", 6667, protocol)).toMap, b.rack)
+    }
+    brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1,
+      rack = b.rack, ApiVersion.latestVersion))
     brokers
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 15ea3ae..ba3d024 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -21,6 +21,11 @@
 0.10.0.0 has <a href="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading) and
 there may be a <a href="#upgrade_10_performance_impact">performance impact during the upgrade</a>. Because new protocols
 are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
+<p/>
+<b>Notes to clients with version 0.9.0.0: </b>Due to a bug introduced in 0.9.0.0,
+clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not
+work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 <b>before</b> brokers are upgraded to
+0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients.
 
 <p><b>For a rolling upgrade:</b></p>
 


[2/2] kafka git commit: KAFKA-1215; Rack-Aware replica assignment option

Posted by ju...@apache.org.
KAFKA-1215; Rack-Aware replica assignment option

Please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment for the overall design.

The update to TopicMetadataRequest/TopicMetadataResponse will be done in a different PR.

Author: Allen Wang <aw...@netflix.com>
Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, 	Grant Henke <gr...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #132 from allenxwang/KAFKA-1215


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/951e30ad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/951e30ad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/951e30ad

Branch: refs/heads/trunk
Commit: 951e30adc6d4a0ed37dcc3fde0050ca5faff146d
Parents: deb2b00
Author: Allen Wang <aw...@netflix.com>
Authored: Tue Mar 15 10:03:03 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 15 10:03:03 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  |  22 +-
 .../kafka/common/protocol/types/Struct.java     |  13 +-
 .../common/requests/UpdateMetadataRequest.java  |  47 ++--
 .../common/requests/RequestResponseTest.java    |  43 ++--
 .../src/main/scala/kafka/admin/AdminUtils.scala | 235 ++++++++++++++++---
 .../main/scala/kafka/admin/BrokerMetadata.scala |  23 ++
 .../main/scala/kafka/admin/RackAwareMode.scala  |  42 ++++
 .../kafka/admin/ReassignPartitionsCommand.scala |  37 ++-
 .../main/scala/kafka/admin/TopicCommand.scala   |   5 +-
 core/src/main/scala/kafka/cluster/Broker.scala  | 112 ++++-----
 .../controller/ControllerChannelManager.scala   |  39 +--
 .../src/main/scala/kafka/server/KafkaApis.scala |   4 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  10 +
 .../scala/kafka/server/KafkaHealthcheck.scala   |  13 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../main/scala/kafka/server/MetadataCache.scala |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  74 +++---
 .../api/RackAwareAutoTopicCreationTest.scala    |  65 +++++
 .../unit/kafka/admin/AdminRackAwareTest.scala   | 196 ++++++++++++++++
 .../test/scala/unit/kafka/admin/AdminTest.scala |  47 +++-
 .../scala/unit/kafka/admin/RackAwareTest.scala  |  82 +++++++
 .../admin/ReassignPartitionsCommandTest.scala   |  51 ++++
 .../unit/kafka/admin/TopicCommandTest.scala     |  32 ++-
 .../unit/kafka/cluster/BrokerEndPointTest.scala |  14 --
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  33 ++-
 docs/upgrade.html                               |   5 +
 27 files changed, 1000 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index a77bf8c..e32d0b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -697,8 +697,26 @@ public class Protocol {
 
     public static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0;
 
-    public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1};
-    public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1};
+    public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V2 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V1;
+
+    public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V2 = UPDATE_METADATA_REQUEST_END_POINT_V1;
+
+    public static final Schema UPDATE_METADATA_REQUEST_BROKER_V2 =
+                    new Schema(new Field("id", INT32, "The broker id."),
+                               new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V2)),
+                               new Field("rack", NULLABLE_STRING, "The rack"));
+
+    public static final Schema UPDATE_METADATA_REQUEST_V2 =
+            new Schema(new Field("controller_id", INT32, "The controller id."),
+                       new Field("controller_epoch", INT32, "The controller epoch."),
+                       new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V2)),
+                       new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V2)));
+
+    public static final Schema UPDATE_METADATA_RESPONSE_V2 = UPDATE_METADATA_RESPONSE_V1;
+
+
+    public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2};
+    public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2};
 
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 4902f25..79f0638 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -311,7 +311,10 @@ public class Struct {
                 for (Object arrayItem: arrayObject)
                     result = prime * result + arrayItem.hashCode();
             } else {
-                result = prime * result + this.get(f).hashCode();
+                Object field = this.get(f);
+                if (field != null) {
+                    result = prime * result + field.hashCode();
+                }
             }
         }
         return result;
@@ -330,11 +333,13 @@ public class Struct {
             return false;
         for (int i = 0; i < this.values.length; i++) {
             Field f = this.schema.get(i);
-            Boolean result;
+            boolean result;
             if (f.type() instanceof ArrayOf) {
-                result = Arrays.equals((Object []) this.get(f), (Object []) other.get(f));
+                result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
             } else {
-                result = this.get(f).equals(other.get(f));
+                Object thisField = this.get(f);
+                Object otherField = other.get(f);
+                result = (thisField == null && otherField == null) || thisField.equals(otherField);
             }
             if (!result)
                 return false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index d8d8013..4c3d0a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -49,16 +49,22 @@ public class UpdateMetadataRequest extends AbstractRequest {
             this.zkVersion = zkVersion;
             this.replicas = replicas;
         }
-
     }
 
     public static final class Broker {
         public final int id;
         public final Map<SecurityProtocol, EndPoint> endPoints;
+        public final String rack;
 
-        public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
+        public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints, String rack) {
             this.id = id;
             this.endPoints = endPoints;
+            this.rack = rack;
+        }
+
+        @Deprecated
+        public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
+            this(id, endPoints, null);
         }
     }
 
@@ -91,6 +97,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     // Broker key names
     private static final String BROKER_ID_KEY_NAME = "id";
     private static final String ENDPOINTS_KEY_NAME = "end_points";
+    private static final String RACK_KEY_NAME = "rack";
 
     // EndPoint key names
     private static final String HOST_KEY_NAME = "host";
@@ -117,20 +124,20 @@ public class UpdateMetadataRequest extends AbstractRequest {
         for (BrokerEndPoint brokerEndPoint : brokerEndPoints) {
             Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
                     new EndPoint(brokerEndPoint.host(), brokerEndPoint.port()));
-            brokers.add(new Broker(brokerEndPoint.id(), endPoints));
+            brokers.add(new Broker(brokerEndPoint.id(), endPoints, null));
         }
         return brokers;
     }
 
     /**
-     * Constructor for version 1.
+     * Constructor for version 2.
      */
     public UpdateMetadataRequest(int controllerId, int controllerEpoch, Map<TopicPartition,
             PartitionState> partitionStates, Set<Broker> liveBrokers) {
-        this(1, controllerId, controllerEpoch, partitionStates, liveBrokers);
+        this(2, controllerId, controllerEpoch, partitionStates, liveBrokers);
     }
 
-    private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
+    public UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
             PartitionState> partitionStates, Set<Broker> liveBrokers) {
         super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)));
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
@@ -173,6 +180,9 @@ public class UpdateMetadataRequest extends AbstractRequest {
 
                 }
                 brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray());
+                if (version >= 2) {
+                    brokerData.set(RACK_KEY_NAME, broker.rack);
+                }
             }
 
             brokersData.add(brokerData);
@@ -226,8 +236,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
                 int port = brokerData.getInt(PORT_KEY_NAME);
                 Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(1);
                 endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, port));
-                liveBrokers.add(new Broker(brokerId, endPoints));
-            } else { // V1
+                liveBrokers.add(new Broker(brokerId, endPoints, null));
+            } else { // V1 or V2
                 Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>();
                 for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) {
                     Struct endPointData = (Struct) endPointDataObj;
@@ -236,11 +246,13 @@ public class UpdateMetadataRequest extends AbstractRequest {
                     short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME);
                     endPoints.put(SecurityProtocol.forId(protocolTypeId), new EndPoint(host, port));
                 }
-                liveBrokers.add(new Broker(brokerId, endPoints));
+                String rack = null;
+                if (brokerData.hasField(RACK_KEY_NAME)) { // V2
+                    rack = brokerData.getString(RACK_KEY_NAME);
+                }
+                liveBrokers.add(new Broker(brokerId, endPoints, rack));
             }
-
         }
-
         controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
         controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
         this.partitionStates = partitionStates;
@@ -249,14 +261,11 @@ public class UpdateMetadataRequest extends AbstractRequest {
 
     @Override
     public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-            case 1:
-                return new UpdateMetadataResponse(Errors.forException(e).code());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
-        }
+        if (versionId <= 2)
+            return new UpdateMetadataResponse(Errors.forException(e).code());
+        else
+            throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
     }
 
     public int controllerId() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7ccf079..b556b46 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -86,8 +86,9 @@ public class RequestResponseTest {
                 createStopReplicaRequest(),
                 createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
                 createStopReplicaResponse(),
-                createUpdateMetadataRequest(1),
-                createUpdateMetadataRequest(1).getErrorResponse(1, new UnknownServerException()),
+                createUpdateMetadataRequest(2, "rack1"),
+                createUpdateMetadataRequest(2, null),
+                createUpdateMetadataRequest(2, "rack1").getErrorResponse(2, new UnknownServerException()),
                 createUpdateMetadataResponse(),
                 createLeaderAndIsrRequest(),
                 createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()),
@@ -97,8 +98,11 @@ public class RequestResponseTest {
         for (AbstractRequestResponse req : requestResponseList)
             checkSerialization(req, null);
 
-        checkSerialization(createUpdateMetadataRequest(0), 0);
-        checkSerialization(createUpdateMetadataRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
+        checkSerialization(createUpdateMetadataRequest(0, null), 0);
+        checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(0, new UnknownServerException()), 0);
+        checkSerialization(createUpdateMetadataRequest(1, null), 1);
+        checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1);
+        checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(1, new UnknownServerException()), 1);
     }
 
     private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception {
@@ -120,7 +124,7 @@ public class RequestResponseTest {
 
     @Test
     public void produceResponseVersionTest() {
-        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
         ProduceResponse v0Response = new ProduceResponse(responseData);
         ProduceResponse v1Response = new ProduceResponse(responseData, 10, 1);
@@ -138,7 +142,7 @@ public class RequestResponseTest {
 
     @Test
     public void fetchResponseVersionTest() {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
 
         FetchResponse v0Response = new FetchResponse(responseData);
@@ -192,14 +196,14 @@ public class RequestResponseTest {
     }
 
     private AbstractRequest createFetchRequest() {
-        Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+        Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<>();
         fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
         fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
         return new FetchRequest(-1, 100, 100000, fetchData);
     }
 
     private AbstractRequestResponse createFetchResponse() {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
         return new FetchResponse(responseData, 0);
     }
@@ -259,13 +263,13 @@ public class RequestResponseTest {
     }
 
     private AbstractRequest createListOffsetRequest() {
-        Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
+        Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>();
         offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
         return new ListOffsetRequest(-1, offsetData);
     }
 
     private AbstractRequestResponse createListOffsetResponse() {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
+        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
         return new ListOffsetResponse(responseData);
     }
@@ -289,13 +293,13 @@ public class RequestResponseTest {
     }
 
     private AbstractRequest createOffsetCommitRequest() {
-        Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>();
+        Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
         commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, ""));
         return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData);
     }
 
     private AbstractRequestResponse createOffsetCommitResponse() {
-        Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
+        Map<TopicPartition, Short> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
         return new OffsetCommitResponse(responseData);
     }
@@ -305,19 +309,19 @@ public class RequestResponseTest {
     }
 
     private AbstractRequestResponse createOffsetFetchResponse() {
-        Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
+        Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
         return new OffsetFetchResponse(responseData);
     }
 
     private AbstractRequest createProduceRequest() {
-        Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>();
+        Map<TopicPartition, ByteBuffer> produceData = new HashMap<>();
         produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
         return new ProduceRequest((short) 1, 5000, produceData);
     }
 
     private AbstractRequestResponse createProduceResponse() {
-        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
         return new ProduceResponse(responseData, 0);
     }
@@ -371,7 +375,7 @@ public class RequestResponseTest {
     }
 
     @SuppressWarnings("deprecation")
-    private AbstractRequest createUpdateMetadataRequest(int version) {
+    private AbstractRequest createUpdateMetadataRequest(int version, String rack) {
         Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);
         List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
@@ -397,11 +401,10 @@ public class RequestResponseTest {
             endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244));
             endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234));
 
-            Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1),
-                    new UpdateMetadataRequest.Broker(1, endPoints2)
+            Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack),
+                    new UpdateMetadataRequest.Broker(1, endPoints2, rack)
             ));
-
-            return new UpdateMetadataRequest(1, 10, partitionStates, liveBrokers);
+            return new UpdateMetadataRequest(version, 1, 10, partitionStates, liveBrokers);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 3fb44d3..24174be 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -19,7 +19,6 @@ package kafka.admin
 
 import kafka.common._
 import kafka.cluster.Broker
-
 import kafka.log.LogConfig
 import kafka.server.ConfigType
 import kafka.utils._
@@ -32,14 +31,12 @@ import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopi
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.MetadataResponse
 
-import scala.Predef._
 import scala.collection._
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import JavaConverters._
 import mutable.ListBuffer
+import scala.collection.mutable
 import collection.Map
 import collection.Set
-
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
 object AdminUtils extends Logging {
@@ -48,11 +45,13 @@ object AdminUtils extends Logging {
   val EntityConfigChangeZnodePrefix = "config_change_"
 
   /**
-   * There are 2 goals of replica assignment:
+   * There are 3 goals of replica assignment:
+   *
    * 1. Spread the replicas evenly among brokers.
    * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
+   * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
    *
-   * To achieve this goal, we:
+   * To achieve this goal for replica assignment without considering racks, we:
    * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
    * 2. Assign the remaining replicas of each partition with an increasing shift.
    *
@@ -64,39 +63,177 @@ object AdminUtils extends Logging {
    * p8        p9        p5        p6        p7       (2nd replica)
    * p3        p4        p0        p1        p2       (3nd replica)
    * p7        p8        p9        p5        p6       (3nd replica)
+   *
+   * To create rack aware assignment, this API will first create a rack alternated broker list. For example,
+   * from this brokerID -> rack mapping:
+   *
+   * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
+   *
+   * The rack alternated list will be:
+   *
+   * 0, 3, 1, 5, 4, 2
+   *
+   * Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment
+   * will be:
+   *
+   * 0 -> 0,3,1
+   * 1 -> 3,1,5
+   * 2 -> 1,5,4
+   * 3 -> 5,4,2
+   * 4 -> 4,2,0
+   * 5 -> 2,0,3
+   *
+   * Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start
+   * shifting the followers. This is to ensure we will not always get the same set of sequences.
+   * In this case, if there is another partition to assign (partition #6), the assignment will be:
+   *
+   * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
+   *
+   * The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated
+   * broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have
+   * any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on
+   * the broker list.
+   *
+   * As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that
+   * each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect
+   * situation where the number of replicas is the same as the number of racks and each rack has the same number of
+   * brokers, it guarantees that the replica distribution is even across brokers and racks.
+   *
+   * @return a Map from partition id to replica ids
+   * @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to
+   *                                 assign each replica to a unique rack.
+   *
    */
-  def assignReplicasToBrokers(brokerList: Seq[Int],
+  def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
                               nPartitions: Int,
                               replicationFactor: Int,
                               fixedStartIndex: Int = -1,
-                              startPartitionId: Int = -1)
-  : Map[Int, Seq[Int]] = {
+                              startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
     if (nPartitions <= 0)
       throw new AdminOperationException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
       throw new AdminOperationException("replication factor must be larger than 0")
-    if (replicationFactor > brokerList.size)
-      throw new AdminOperationException("replication factor: " + replicationFactor +
-        " larger than available brokers: " + brokerList.size)
-    val ret = new mutable.HashMap[Int, List[Int]]()
-    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
-    var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
+    if (replicationFactor > brokerMetadatas.size)
+      throw new AdminOperationException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
+    if (brokerMetadatas.forall(_.rack.isEmpty))
+      assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
+        startPartitionId)
+    else {
+      if (brokerMetadatas.exists(_.rack.isEmpty))
+        throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
+      assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
+        startPartitionId)
+    }
+  }
 
-    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
-    for (i <- 0 until nPartitions) {
-      if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
+  private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
+                                                 replicationFactor: Int,
+                                                 brokerList: Seq[Int],
+                                                 fixedStartIndex: Int,
+                                                 startPartitionId: Int): Map[Int, Seq[Int]] = {
+    val ret = mutable.Map[Int, Seq[Int]]()
+    val brokerArray = brokerList.toArray
+    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
+    var currentPartitionId = math.max(0, startPartitionId)
+    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
+    for (_ <- 0 until nPartitions) {
+      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
         nextReplicaShift += 1
-      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
-      var replicaList = List(brokerList(firstReplicaIndex))
+      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
+      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
       for (j <- 0 until replicationFactor - 1)
-        replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
-      ret.put(currentPartitionId, replicaList.reverse)
-      currentPartitionId = currentPartitionId + 1
+        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
+      ret.put(currentPartitionId, replicaBuffer)
+      currentPartitionId += 1
     }
-    ret.toMap
+    ret
+  }
+
+  private def assignReplicasToBrokersRackAware(nPartitions: Int,
+                                               replicationFactor: Int,
+                                               brokerMetadatas: Seq[BrokerMetadata],
+                                               fixedStartIndex: Int,
+                                               startPartitionId: Int): Map[Int, Seq[Int]] = {
+    val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
+      id -> rack
+    }.toMap
+    val numRacks = brokerRackMap.values.toSet.size
+    val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
+    val numBrokers = arrangedBrokerList.size
+    val ret = mutable.Map[Int, Seq[Int]]()
+    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
+    var currentPartitionId = math.max(0, startPartitionId)
+    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
+    for (_ <- 0 until nPartitions) {
+      if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
+        nextReplicaShift += 1
+      val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
+      val leader = arrangedBrokerList(firstReplicaIndex)
+      val replicaBuffer = mutable.ArrayBuffer(leader)
+      val racksWithReplicas = mutable.Set(brokerRackMap(leader))
+      val brokersWithReplicas = mutable.Set(leader)
+      var k = 0
+      for (_ <- 0 until replicationFactor - 1) {
+        var done = false
+        while (!done) {
+          val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
+          val rack = brokerRackMap(broker)
+          // Skip this broker if
+          // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
+          //    that do not have any replica, or
+          // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
+          if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
+              && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
+            replicaBuffer += broker
+            racksWithReplicas += rack
+            brokersWithReplicas += broker
+            done = true
+          }
+          k += 1
+        }
+      }
+      ret.put(currentPartitionId, replicaBuffer)
+      currentPartitionId += 1
+    }
+    ret
   }
 
+  /**
+    * Given broker and rack information, returns a list of brokers alternated by the rack. Assume
+    * this is the rack and its brokers:
+    *
+    * rack1: 0, 1, 2
+    * rack2: 3, 4, 5
+    * rack3: 6, 7, 8
+    *
+    * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
+    *
+    * This is essential to make sure that the assignReplicasToBrokers API can use such list and
+    * assign replicas to brokers in a simple round-robin fashion, while ensuring an even
+    * distribution of leader and replica counts on each broker and that replicas are
+    * distributed to all racks.
+    */
+  private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = {
+    val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) =>
+      (rack, brokers.toIterator)
+    }
+    val racks = brokersIteratorByRack.keys.toArray.sorted
+    val result = new mutable.ArrayBuffer[Int]
+    var rackIndex = 0
+    while (result.size < brokerRackMap.size) {
+      val rackIterator = brokersIteratorByRack(racks(rackIndex))
+      if (rackIterator.hasNext)
+        result += rackIterator.next()
+      rackIndex = (rackIndex + 1) % racks.length
+    }
+    result
+  }
 
+  private[admin] def getInverseMap(brokerRackMap: Map[Int, String]): Map[String, Seq[Int]] = {
+    brokerRackMap.toSeq.map { case (id, rack) => (rack, id) }
+      .groupBy { case (rack, _) => rack }
+      .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) }
+  }
  /**
   * Add partitions to existing topic with optional replica assignment
   *
@@ -110,7 +247,8 @@ object AdminUtils extends Logging {
                     topic: String,
                     numPartitions: Int = 1,
                     replicaAssignmentStr: String = "",
-                    checkBrokerAvailable: Boolean = true) {
+                    checkBrokerAvailable: Boolean = true,
+                    rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
     val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
     if (existingPartitionsReplicaList.size == 0)
       throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -124,16 +262,16 @@ object AdminUtils extends Logging {
       throw new AdminOperationException("The number of partitions for a topic can only be increased")
 
     // create the new partition replication list
-    val brokerList = zkUtils.getSortedBrokerList()
-    val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
-      var startIndex = brokerList.indexWhere(_ >= existingReplicaListForPartitionZero.head)
-      if(startIndex < 0) {
-        startIndex = 0
+    val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
+    val newPartitionReplicaList =
+      if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
+        val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingReplicaListForPartitionZero.head))
+        AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingReplicaListForPartitionZero.size,
+          startIndex, existingPartitionsReplicaList.size)
       }
-      AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaListForPartitionZero.size, startIndex, existingPartitionsReplicaList.size)
-    }
-    else
-      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable)
+      else
+        getManualReplicaAssignment(replicaAssignmentStr, brokerMetadatas.map(_.id).toSet,
+          existingPartitionsReplicaList.size, checkBrokerAvailable)
 
     // check if manual assignment has the right replication factor
     val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
@@ -237,13 +375,32 @@ object AdminUtils extends Logging {
   def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
     zkUtils.zkClient.exists(getTopicPath(topic))
 
+  def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
+                        brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
+    val allBrokers = zkUtils.getAllBrokersInCluster()
+    val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
+    val brokersWithRack = brokers.filter(_.rack.nonEmpty)
+    if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
+      throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
+        " to make replica assignment without rack information.")
+    }
+    val brokerMetadatas = rackAwareMode match {
+      case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
+      case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
+        brokers.map(broker => BrokerMetadata(broker.id, None))
+      case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+    }
+    brokerMetadatas.sortBy(_.id)
+  }
+
   def createTopic(zkUtils: ZkUtils,
                   topic: String,
                   partitions: Int,
                   replicationFactor: Int,
-                  topicConfig: Properties = new Properties) {
-    val brokerList = zkUtils.getSortedBrokerList()
-    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
+                  topicConfig: Properties = new Properties,
+                  rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
+    val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
+    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
   }
 
@@ -304,6 +461,7 @@ object AdminUtils extends Logging {
 
   /**
    * Update the config for a client and create a change notification so the change will propagate to other brokers
+   *
    * @param zkUtils Zookeeper utilities used to write the config to ZK
    * @param clientId: The clientId for which configs are being changed
    * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
@@ -316,6 +474,7 @@ object AdminUtils extends Logging {
 
   /**
    * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+   *
    * @param zkUtils Zookeeper utilities used to write the config to ZK
    * @param topic: The topic for which configs are being changed
    * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/BrokerMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/BrokerMetadata.scala b/core/src/main/scala/kafka/admin/BrokerMetadata.scala
new file mode 100644
index 0000000..86831e3
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/BrokerMetadata.scala
@@ -0,0 +1,23 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+
+package kafka.admin
+
+/**
+  * Broker metadata used by admin tools.
+  *
+  * @param id an integer that uniquely identifies this broker
+  * @param rack the rack of the broker, which is used to in rack aware partition assignment for fault tolerance.
+  *             Examples: "RACK1", "us-east-1d"
+  */
+case class BrokerMetadata(id: Int, rack: Option[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/RackAwareMode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/RackAwareMode.scala b/core/src/main/scala/kafka/admin/RackAwareMode.scala
new file mode 100644
index 0000000..45555b6
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/RackAwareMode.scala
@@ -0,0 +1,42 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.admin
+
+/**
+  * Mode to control how rack aware replica assignment will be executed
+  */
+object RackAwareMode {
+
+  /**
+    * Ignore all rack information in replica assignment. This is an optional mode used in command line.
+    */
+  case object Disabled extends RackAwareMode
+
+  /**
+    * Assume every broker has rack, or none of the brokers has rack. If only partial brokers have rack, fail fast
+    * in replica assignment. This is the default mode in command line tools (TopicCommand and ReassignPartitionsCommand).
+    */
+  case object Enforced extends RackAwareMode
+
+  /**
+    * Use rack information if every broker has a rack. Otherwise, fallback to Disabled mode. This is used in auto topic
+    * creation.
+    */
+  case object Safe extends RackAwareMode
+}
+
+sealed trait RackAwareMode

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 13e423d..446ab9f 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -91,23 +91,33 @@ object ReassignPartitionsCommand extends Logging {
     if (duplicateReassignments.nonEmpty)
       throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
+    val disableRackAware = opts.options.has(opts.disableRackAware)
+    val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
+    println("Current partition replica assignment\n\n%s".format(zkUtils.getPartitionReassignmentZkData(currentAssignments)))
+    println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(proposedAssignments)))
+  }
+
+  def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
     val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString)
     val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
     if (duplicateTopicsToReassign.nonEmpty)
       throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
-    val topicPartitionsToReassign = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
-
-    var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
-    val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
-    groupedByTopic.foreach { topicInfo =>
-      val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
-        topicInfo._2.head._2.size)
-      partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
+    val currentAssignment = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
+
+    val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic }
+    val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
+    val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, rackAwareMode, Some(brokerListToReassign))
+
+    val partitionsToBeReassigned = mutable.Map[TopicAndPartition, Seq[Int]]()
+    groupedByTopic.foreach { case (topic, assignment) =>
+      val (_, replicas) = assignment.head
+      val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
+      partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
+        (TopicAndPartition(topic, partition) -> replicas)
+      }
     }
-    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic).toSeq)
-    println("Current partition replica assignment\n\n%s"
-      .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
-    println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
+
+    (partitionsToBeReassigned, currentAssignment)
   }
 
   def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
@@ -200,7 +210,8 @@ object ReassignPartitionsCommand extends Logging {
                       .withRequiredArg
                       .describedAs("brokerlist")
                       .ofType(classOf[String])
-                      
+    val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index d4212c5..e89e09d 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -105,7 +105,9 @@ object TopicCommand extends Logging {
         val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
         warnOnMaxMessagesChange(configs, replicas)
-        AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
+        val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
+                            else RackAwareMode.Enforced
+        AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
       }
       println("Created topic \"%s\".".format(topic))
     } catch  {
@@ -324,6 +326,7 @@ object TopicCommand extends Logging {
     val ifNotExistsOpt = parser.accepts("if-not-exists",
                                         "if set when creating topics, the action will only execute if the topic does not already exist")
 
+    val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
     val options = parser.parse(args : _*)
 
     val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 7340f14..77b85e0 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -19,6 +19,7 @@ package kafka.cluster
 
 import java.nio.ByteBuffer
 
+import kafka.api.ApiUtils._
 import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
 import kafka.utils.Json
 import org.apache.kafka.common.Node
@@ -32,26 +33,41 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 object Broker {
 
   /**
-   * Create a broker object from id and JSON string.
-   * @param id
-   * @param brokerInfoString
-   *
-   * Version 1 JSON schema for a broker is:
-   * {"version":1,
-   *  "host":"localhost",
-   *  "port":9092
-   *  "jmx_port":9999,
-   *  "timestamp":"2233345666" }
-   *
-   * The current JSON schema for a broker is:
-   * {"version":2,
-   *  "host","localhost",
-   *  "port",9092
-   *  "jmx_port":9999,
-   *  "timestamp":"2233345666",
-   *  "endpoints": ["PLAINTEXT://host1:9092",
-   *                "SSL://host1:9093"]
-   */
+    * Create a broker object from id and JSON string.
+    *
+    * @param id
+    * @param brokerInfoString
+    *
+    * Version 1 JSON schema for a broker is:
+    * {
+    *   "version":1,
+    *   "host":"localhost",
+    *   "port":9092
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666"
+    * }
+    *
+    * Version 2 JSON schema for a broker is:
+    * {
+    *   "version":2,
+    *   "host":"localhost",
+    *   "port":9092
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666",
+    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
+    * }
+    *
+    * Version 3 (current) JSON schema for a broker is:
+    * {
+    *   "version":3,
+    *   "host":"localhost",
+    *   "port":9092
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666",
+    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
+    *   "rack":"dc1"
+    * }
+    */
   def createBroker(id: Int, brokerInfoString: String): Broker = {
     if (brokerInfoString == null)
       throw new BrokerNotAvailableException(s"Broker id $id does not exist")
@@ -75,9 +91,8 @@ object Broker {
                 (ep.protocolType, ep)
               }.toMap
             }
-
-
-          new Broker(id, endpoints)
+          val rack = brokerInfo.get("rack").filter(_ != null).map(_.asInstanceOf[String])
+          new Broker(id, endpoints, rack)
         case None =>
           throw new BrokerNotAvailableException(s"Broker id $id does not exist")
       }
@@ -86,61 +101,34 @@ object Broker {
         throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
     }
   }
-
-  /**
-   *
-   * @param buffer Containing serialized broker.
-   *               Current serialization is:
-   *               id (int), number of endpoints (int), serialized endpoints
-   * @return broker object
-   */
-  def readFrom(buffer: ByteBuffer): Broker = {
-    val id = buffer.getInt
-    val numEndpoints = buffer.getInt
-
-    val endpoints = List.range(0, numEndpoints).map(i => EndPoint.readFrom(buffer))
-            .map(ep => ep.protocolType -> ep).toMap
-    new Broker(id, endpoints)
-  }
 }
 
-case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint]) {
+case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint], rack: Option[String]) {
 
-  override def toString: String = id + " : " + endPoints.values.mkString("(",",",")")
+  override def toString: String =
+    s"$id : ${endPoints.values.mkString("(",",",")")} : ${rack.orNull}"
+
+  def this(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) = {
+    this(id, endPoints, None)
+  }
 
   def this(id: Int, host: String, port: Int, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
-    this(id, Map(protocol -> EndPoint(host, port, protocol)))
+    this(id, Map(protocol -> EndPoint(host, port, protocol)), None)
   }
 
   def this(bep: BrokerEndPoint, protocol: SecurityProtocol) = {
     this(bep.id, bep.host, bep.port, protocol)
   }
 
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(id)
-    buffer.putInt(endPoints.size)
-    for(endpoint <- endPoints.values) {
-      endpoint.writeTo(buffer)
-    }
-  }
-
-  def sizeInBytes: Int =
-    4 + /* broker id*/
-    4 + /* number of endPoints */
-    endPoints.values.map(_.sizeInBytes).sum /* end points */
-
-  def supportsChannel(protocolType: SecurityProtocol): Unit = {
-    endPoints.contains(protocolType)
-  }
-
   def getNode(protocolType: SecurityProtocol): Node = {
-    val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)))
+    val endpoint = endPoints.getOrElse(protocolType,
+      throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id"))
     new Node(id, endpoint.host, endpoint.port)
   }
 
   def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = {
-    val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)))
+    val endpoint = endPoints.getOrElse(protocolType,
+      throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id"))
     new BrokerEndPoint(id, endpoint.host, endpoint.port)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3b1a458..ea156fa 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -16,24 +16,25 @@
 */
 package kafka.controller
 
-import kafka.api.{LeaderAndIsr, KAFKA_0_9_0, PartitionStateInfo}
+import java.net.SocketTimeoutException
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+
+import kafka.api._
+import kafka.cluster.Broker
+import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, Node}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode}
-import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys}
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
 import org.apache.kafka.common.utils.Time
-import collection.mutable.HashMap
-import kafka.cluster.Broker
-import java.net.{SocketTimeoutException}
-import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
-import kafka.server.KafkaConfig
-import collection.mutable
-import kafka.common.{KafkaException, TopicAndPartition}
-import collection.Set
-import collection.JavaConverters._
+import org.apache.kafka.common.{BrokerEndPoint, Node, TopicPartition}
+
+import scala.collection.JavaConverters._
+import scala.collection.{Set, mutable}
+import scala.collection.mutable.HashMap
 
 class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
   protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@@ -380,7 +381,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           topicPartition -> partitionState
         }
 
-        val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) (1: Short) else (0: Short)
+        val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2: Short
+                      else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
+                      else 0: Short
 
         val updateMetadataRequest =
           if (version == 0) {
@@ -395,9 +398,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
               val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
                 securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
               }
-              new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava)
+              new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
             }
-            new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
+            new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
           }
 
         controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5f9ec8b..452f721 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import java.lang.{Long => JLong, Short => JShort}
 import java.util.Properties
 
-import kafka.admin.AdminUtils
+import kafka.admin.{RackAwareMode, AdminUtils}
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common._
@@ -624,7 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                           replicationFactor: Int,
                           properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
     try {
-      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties)
+      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
       info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
         .format(topic, numPartitions, replicationFactor))
       new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8d14edd..9c24876 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -222,6 +222,8 @@ object KafkaConfig {
   val MaxConnectionsPerIpProp = "max.connections.per.ip"
   val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
   val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
+  /***************** rack configuration *************/
+  val RackProp = "broker.rack"
   /** ********* Log Configuration ***********/
   val NumPartitionsProp = "num.partitions"
   val LogDirsProp = "log.dirs"
@@ -388,6 +390,8 @@ object KafkaConfig {
   val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address"
   val MaxConnectionsPerIpOverridesDoc = "Per-ip or hostname overrides to the default maximum number of connections"
   val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this"
+  /************* Rack Configuration **************/
+  val RackDoc = "Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: `RACK1`, `us-east-1d`"
   /** ********* Log Configuration ***********/
   val NumPartitionsDoc = "The default number of log partitions per topic"
   val LogDirDoc = "The directory in which the log data is kept (supplemental for " + LogDirsProp + " property)"
@@ -571,6 +575,9 @@ object KafkaConfig {
       .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
       .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc)
 
+      /************ Rack Configuration ******************/
+      .define(RackProp, STRING, null, MEDIUM, RackDoc)
+
       /** ********* Log Configuration ***********/
       .define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc)
       .define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc)
@@ -771,6 +778,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
     getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
   val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
 
+  /***************** rack configuration **************/
+  val rack = Option(getString(KafkaConfig.RackProp))
+
   /** ********* Log Configuration ***********/
   val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
   val numPartitions = getInt(KafkaConfig.NumPartitionsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 928ff43..2598e6d 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -17,12 +17,14 @@
 
 package kafka.server
 
+import java.net.InetAddress
+
+import kafka.api.ApiVersion
 import kafka.cluster.EndPoint
 import kafka.utils._
+import org.I0Itec.zkclient.IZkStateListener
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient, ZkConnection}
-import java.net.InetAddress
 
 
 /**
@@ -35,7 +37,9 @@ import java.net.InetAddress
  */
 class KafkaHealthcheck(private val brokerId: Int,
                        private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
-                       private val zkUtils: ZkUtils) extends Logging {
+                       private val zkUtils: ZkUtils,
+                       private val rack: Option[String],
+                       private val interBrokerProtocolVersion: ApiVersion) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   val sessionExpireListener = new SessionExpireListener
@@ -61,7 +65,8 @@ class KafkaHealthcheck(private val brokerId: Int,
     // only PLAINTEXT is supported as default
     // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
     val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
-    zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
+    zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack,
+      interBrokerProtocolVersion)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2f5441a..e29494b 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -239,7 +239,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           else
             (protocol, endpoint)
         }
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion)
         kafkaHealthcheck.startup()
 
         // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 1fdd717..6df261c 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -159,7 +159,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
           endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol))
           nodes.put(protocol, new Node(broker.id, ep.host, ep.port))
         }
-        aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala)
+        aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala, Option(broker.rack))
         aliveNodes(broker.id) = nodes.asScala
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f39ed01..99c8196 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -18,31 +18,26 @@
 package kafka.utils
 
 import java.util.concurrent.CountDownLatch
+
+import kafka.admin._
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0, LeaderAndIsr}
 import kafka.cluster._
+import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
 import kafka.consumer.{ConsumerThreadId, TopicCount}
+import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
 import kafka.server.ConfigType
-import org.I0Itec.zkclient.{ZkClient,ZkConnection}
-import org.I0Itec.zkclient.exception.{ZkException, ZkNodeExistsException, ZkNoNodeException,
-  ZkMarshallingError, ZkBadVersionException}
+import kafka.utils.ZkUtils._
+import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
+import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.zookeeper.ZooDefs
-import scala.collection._
-import kafka.api.LeaderAndIsr
-import org.apache.zookeeper.data.{ACL, Stat}
-import kafka.admin._
-import kafka.common.{KafkaException, NoEpochForPartitionException}
-import kafka.controller.ReassignedPartitionsContext
-import kafka.controller.KafkaController
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.common.TopicAndPartition
-import kafka.utils.ZkUtils._
-import org.apache.zookeeper.AsyncCallback.{DataCallback,StringCallback}
-import org.apache.zookeeper.CreateMode
-import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.ZooKeeper
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
+
+import scala.collection._
 
 object ZkUtils {
   val ConsumersPath = "/consumers"
@@ -256,19 +251,43 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   /**
-   * Register brokers with v2 json format (which includes multiple endpoints).
+   * Register brokers with v3 json format (which includes multiple endpoints and rack) if
+   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise.
+   * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2.
+   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X without having to upgrade
+   * to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case).
+   *
    * This format also includes default endpoints for compatibility with older clients.
-   * @param id
-   * @param host
-   * @param port
-   * @param advertisedEndpoints
-   * @param jmxPort
+   *
+   * @param id broker ID
+   * @param host broker host name
+   * @param port broker port
+   * @param advertisedEndpoints broker end points
+   * @param jmxPort jmx port
+   * @param rack broker rack
+   * @param apiVersion Kafka version the broker is running as
    */
-  def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
+  def registerBrokerInZk(id: Int,
+                         host: String,
+                         port: Int,
+                         advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint],
+                         jmxPort: Int,
+                         rack: Option[String],
+                         apiVersion: ApiVersion) {
     val brokerIdPath = BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
 
-    val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
+    val version = if (apiVersion >= KAFKA_0_10_0_IV0) 3 else 2
+    var jsonMap = Map("version" -> version,
+                      "host" -> host,
+                      "port" -> port,
+                      "endpoints" -> advertisedEndpoints.values.map(_.connectionString).toArray,
+                      "jmx_port" -> jmxPort,
+                      "timestamp" -> timestamp
+    )
+    rack.foreach(rack => if (version >= 3) jsonMap += ("rack" -> rack))
+
+    val brokerInfo = Json.encode(jsonMap)
     registerBrokerInZk(brokerIdPath, brokerInfo)
 
     info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
@@ -745,6 +764,7 @@ class ZkUtils(val zkClient: ZkClient,
   /**
    * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
    * or throws an exception if the broker dies before the query to zookeeper finishes
+   *
    * @param brokerId The broker id
    * @return An optional Broker object encapsulating the broker metadata
    */
@@ -768,7 +788,6 @@ class ZkUtils(val zkClient: ZkClient,
       case e: ZkNoNodeException => {
         createParentPath(BrokerSequenceIdPath, acls)
         try {
-          import scala.collection.JavaConversions._
           zkClient.createPersistent(BrokerSequenceIdPath, "", acls)
           0
         } catch {
@@ -880,7 +899,6 @@ class ZKConfig(props: VerifiableProperties) {
 
 object ZkPath {
   @volatile private var isNamespacePresent: Boolean = false
-  import scala.collection.JavaConversions._
 
   def checkNamespace(client: ZkClient) {
     if(isNamespacePresent)

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
new file mode 100644
index 0000000..a2f2041
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Properties
+
+import kafka.admin.{RackAwareMode, AdminUtils, RackAwareTest}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.junit.Assert._
+import org.junit.Test
+import scala.collection.Map
+
+class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest {
+  val numServers = 4
+  val numPartitions = 8
+  val replicationFactor = 2
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
+  overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString)
+
+  def generateConfigs() =
+    (0 until numServers) map { node =>
+      TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString))
+    } map (KafkaConfig.fromProps(_, overridingProps))
+
+  private val topic = "topic"
+
+  @Test
+  def testAutoCreateTopic() {
+    val producer = TestUtils.createNewProducer(brokerList, retries = 5)
+    try {
+      // Send a message to auto-create the topic
+      val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+      assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
+
+      // double check that the topic is created with leader elected
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+      val assignment = zkUtils.getReplicaAssignmentForTopics(Seq(topic)).map { case (topicPartition, replicas) =>
+        topicPartition.partition -> replicas
+      }
+      val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+      val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
+      assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
+      checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor)
+    } finally producer.close()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
new file mode 100644
index 0000000..27ff4d4
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.{Map, Seq}
+
+class AdminRackAwareTest extends RackAwareTest with Logging {
+
+  @Test
+  def testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() {
+    val rackMap = Map(0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1")
+    val newList = AdminUtils.getRackAlternatedBrokerList(rackMap)
+    assertEquals(List(0, 3, 1, 5, 4, 2), newList)
+    val anotherList = AdminUtils.getRackAlternatedBrokerList(rackMap - 5)
+    assertEquals(List(0, 3, 1, 4, 2), anotherList)
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0)
+    val expected = Map(0 -> List(0, 3, 1),
+                       1 -> List(3, 1, 5),
+                       2 -> List(1, 5, 4),
+                       3 -> List(5, 4, 2),
+                       4 -> List(4, 2, 0),
+                       5 -> List(2, 0, 3),
+                       6 -> List(0, 4, 2))
+    assertEquals(expected, assignment)
+  }
+
+  @Test
+  def testAssignmentWithRackAware() {
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+    val numPartitions = 6
+    val replicationFactor = 3
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor, 2, 0)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+      replicationFactor)
+  }
+
+  @Test
+  def testAssignmentWithRackAwareWithRandomStartIndex() {
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+    val numPartitions = 6
+    val replicationFactor = 3
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+      replicationFactor)
+  }
+
+  @Test
+  def testAssignmentWithRackAwareWithUnevenReplicas() {
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+    val numPartitions = 13
+    val replicationFactor = 3
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor, 0, 0)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+      replicationFactor, verifyLeaderDistribution = false, verifyReplicasDistribution = false)
+  }
+
+  @Test
+  def testAssignmentWithRackAwareWithUnevenRacks() {
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+    val numPartitions = 12
+    val replicationFactor = 3
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+      replicationFactor, verifyReplicasDistribution = false)
+  }
+
+  @Test
+  def testAssignmentWith2ReplicasRackAware() {
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+    val numPartitions = 12
+    val replicationFactor = 2
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+      replicationFactor)
+  }
+
+  @Test
+  def testRackAwareExpansion() {
+    val brokerRackMapping = Map(6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack3", 11 -> "rack1")
+    val numPartitions = 12
+    val replicationFactor = 2
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor, startPartitionId = 12)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+      replicationFactor)
+  }
+
+  @Test
+  def testAssignmentWith2ReplicasRackAwareWith6Partitions() {
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+    val numPartitions = 6
+    val replicationFactor = 2
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+      replicationFactor)
+  }
+
+  @Test
+  def testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() {
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 4 -> "rack3")
+    val numPartitions = 3
+    val replicationFactor = 2
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, replicationFactor)
+  }
+
+  @Test
+  def testLargeNumberPartitionsAssignment() {
+    val numPartitions = 96
+    val replicationFactor = 3
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1",
+      6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack1", 11 -> "rack3")
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor)
+    checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+      replicationFactor)
+  }
+
+  @Test
+  def testMoreReplicasThanRacks() {
+    val numPartitions = 6
+    val replicationFactor = 5
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
+    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+    val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+    for (partition <- 0 until numPartitions)
+      assertEquals(3, distribution.partitionRacks(partition).toSet.size)
+  }
+
+  @Test
+  def testLessReplicasThanRacks() {
+    val numPartitions = 6
+    val replicationFactor = 2
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+      replicationFactor)
+    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+    val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+    for (partition <- 0 to 5)
+      assertEquals(2, distribution.partitionRacks(partition).toSet.size)
+  }
+
+  @Test
+  def testSingleRack() {
+    val numPartitions = 6
+    val replicationFactor = 3
+    val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 -> "rack1", 4 -> "rack1", 5 -> "rack1")
+    val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
+    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+    val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+    for (partition <- 0 until numPartitions)
+      assertEquals(1, distribution.partitionRacks(partition).toSet.size)
+    for (broker <- brokerRackMapping.keys)
+      assertEquals(1, distribution.brokerLeaderCount(broker))
+  }
+
+  @Test
+  def testSkipBrokerWithReplicaAlreadyAssigned() {
+    val rackInfo = Map(0 -> "a", 1 -> "b", 2 -> "c", 3 -> "a", 4 -> "a")
+    val brokerList = 0 to 4
+    val numPartitions = 6
+    val replicationFactor = 4
+    val brokerMetadatas = toBrokerMetadata(rackInfo)
+    assertEquals(brokerList, brokerMetadatas.map(_.id))
+    val assignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, numPartitions, replicationFactor,
+      fixedStartIndex = 2)
+    checkReplicaDistribution(assignment, rackInfo, 5, 6, 4,
+      verifyRackAware = false, verifyLeaderDistribution = false, verifyReplicasDistribution = false)
+  }
+}