You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/15 18:03:10 UTC
[1/2] kafka git commit: KAFKA-1215;
Rack-Aware replica assignment option
Repository: kafka
Updated Branches:
refs/heads/trunk deb2b004c -> 951e30adc
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7c2577c..8910e09 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -33,20 +33,20 @@ import TestUtils._
import scala.collection.{Map, immutable}
-class AdminTest extends ZooKeeperTestHarness with Logging {
+class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@Test
def testReplicaAssignment() {
- val brokerList = List(0, 1, 2, 3, 4)
+ val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
// test 0 replication factor
intercept[AdminOperationException] {
- AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
+ AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0)
}
// test wrong replication factor
intercept[AdminOperationException] {
- AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
+ AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 6)
}
// correct assignment
@@ -62,9 +62,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
8 -> List(3, 0, 1),
9 -> List(4, 1, 2))
- val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
- val e = (expectedAssignment.toList == actualAssignment.toList)
- assertTrue(expectedAssignment.toList == actualAssignment.toList)
+ val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0)
+ assertEquals(expectedAssignment, actualAssignment)
}
@Test
@@ -314,7 +313,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
val partition = 1
val preferredReplica = 0
// create brokers
- val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
+ val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
+ val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
@@ -452,4 +452,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
server.config.logDirs.foreach(CoreUtils.rm(_))
}
}
+
+ @Test
+ def testGetBrokerMetadatas() {
+ // broker 4 has no rack information
+ val brokerList = 0 to 5
+ val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3")
+ val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet))
+ TestUtils.createBrokersInZk(brokerMetadatas, zkUtils)
+
+ val processedMetadatas1 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Disabled)
+ assertEquals(brokerList, processedMetadatas1.map(_.id))
+ assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack))
+
+ val processedMetadatas2 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Safe)
+ assertEquals(brokerList, processedMetadatas2.map(_.id))
+ assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack))
+
+ intercept[AdminOperationException] {
+ AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+ }
+
+ val partialList = List(0, 1, 2, 3, 5)
+ val processedMetadatas3 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced, Some(partialList))
+ assertEquals(partialList, processedMetadatas3.map(_.id))
+ assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack))
+
+ val numPartitions = 3
+ AdminUtils.createTopic(zkUtils, "foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe)
+ val assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo"))
+ assertEquals(numPartitions, assignment.size)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
new file mode 100644
index 0000000..facc745
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import scala.collection.{Map, Seq, mutable}
+import org.junit.Assert._
+
+trait RackAwareTest {
+
+ def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
+ brokerRackMapping: Map[Int, String],
+ numBrokers: Int,
+ numPartitions: Int,
+ replicationFactor: Int,
+ verifyRackAware: Boolean = true,
+ verifyLeaderDistribution: Boolean = true,
+ verifyReplicasDistribution: Boolean = true) {
+ // always verify that no broker will be assigned for more than one replica
+ for ((_, brokerList) <- assignment) {
+ assertEquals("More than one replica is assigned to same broker for the same partition", brokerList.toSet.size, brokerList.size)
+ }
+ val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+
+ if (verifyRackAware) {
+ val partitionRackMap = distribution.partitionRacks
+ assertEquals("More than one replica of the same partition is assigned to the same rack",
+ List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size))
+ }
+
+ if (verifyLeaderDistribution) {
+ val leaderCount = distribution.brokerLeaderCount
+ val leaderCountPerBroker = numPartitions / numBrokers
+ assertEquals("Preferred leader count is not even for brokers", List.fill(numBrokers)(leaderCountPerBroker), leaderCount.values.toList)
+ }
+
+ if (verifyReplicasDistribution) {
+ val replicasCount = distribution.brokerReplicasCount
+ val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers
+ assertEquals("Replica count is not even for broker", List.fill(numBrokers)(numReplicasPerBroker), replicasCount.values.toList)
+ }
+ }
+
+ def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int, String]): ReplicaDistributions = {
+ val leaderCount = mutable.Map[Int, Int]()
+ val partitionCount = mutable.Map[Int, Int]()
+ val partitionRackMap = mutable.Map[Int, List[String]]()
+ assignment.foreach { case (partitionId, replicaList) =>
+ val leader = replicaList.head
+ leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1
+ for (brokerId <- replicaList) {
+ partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1
+ val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for $brokerId in `brokerRackMapping`"))
+ partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List())
+ }
+ }
+ ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
+ }
+
+ def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): Seq[BrokerMetadata] =
+ rackMap.toSeq.map { case (brokerId, rack) =>
+ BrokerMetadata(brokerId, Some(rack))
+ } ++ brokersWithoutRack.map { brokerId =>
+ BrokerMetadata(brokerId, None)
+ }.sortBy(_.id)
+
+}
+
+case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
new file mode 100644
index 0000000..0f71a19
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Test
+
+class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
+
+ @Test
+ def testRackAwareReassign() {
+ val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
+ TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
+
+ val numPartitions = 18
+ val replicationFactor = 3
+
+ // create a non rack aware assignment topic first
+ val createOpts = new kafka.admin.TopicCommand.TopicCommandOptions(Array(
+ "--partitions", numPartitions.toString,
+ "--replication-factor", replicationFactor.toString,
+ "--disable-rack-aware",
+ "--topic", "foo"))
+ kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
+
+ val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
+ val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+ rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
+
+ val assignment = proposedAssignment map { case (topicPartition, replicas) =>
+ (topicPartition.partition, replicas)
+ }
+ checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index d554b02..b42aaf4 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -27,7 +27,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils._
import kafka.coordinator.GroupCoordinator
-class TopicCommandTest extends ZooKeeperTestHarness with Logging {
+class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@Test
def testConfigPreservationAcrossPartitionAlteration() {
@@ -157,4 +157,34 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic, "--if-not-exists"))
TopicCommand.createTopic(zkUtils, createNotExistsOpts)
}
+
+ @Test
+ def testCreateAlterTopicWithRackAware() {
+ val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
+ TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
+
+ val numPartitions = 18
+ val replicationFactor = 3
+ val createOpts = new TopicCommandOptions(Array(
+ "--partitions", numPartitions.toString,
+ "--replication-factor", replicationFactor.toString,
+ "--topic", "foo"))
+ TopicCommand.createTopic(zkUtils, createOpts)
+
+ var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
+ tp.partition -> replicas
+ }
+ checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
+
+ val alteredNumPartitions = 36
+ // verify that adding partitions will also be rack aware
+ val alterOpts = new TopicCommandOptions(Array(
+ "--partitions", alteredNumPartitions.toString,
+ "--topic", "foo"))
+ TopicCommand.alterTopic(zkUtils, alterOpts)
+ assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
+ tp.partition -> replicas
+ }
+ checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 905612c..400d6d6 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -28,20 +28,6 @@ import scala.collection.mutable
class BrokerEndPointTest extends Logging {
@Test
- def testSerDe() {
-
- val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
- val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint)
- val origBroker = new Broker(1, listEndPoints)
- val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes)
-
- origBroker.writeTo(brokerBytes)
-
- val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer])
- assert(origBroker == newBroker)
- }
-
- @Test
def testHashAndEquals() {
val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 7524e6a..fa240d2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -530,7 +530,7 @@ class KafkaConfigTest {
case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.MetricReporterClassesProp => // ignore string
-
+ case KafkaConfig.RackProp => // ignore string
//SSL Configs
case KafkaConfig.PrincipalBuilderClassProp =>
case KafkaConfig.SslProtocolProp => // ignore string
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2523083..49fb85f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -27,29 +27,27 @@ import java.security.cert.X509Certificate
import javax.net.ssl.X509TrustManager
import charset.Charset
-import kafka.security.auth.{Resource, Authorizer, Acl}
+import kafka.security.auth.{Acl, Authorizer, Resource}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.test.TestSslUtils
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-
import kafka.server._
import kafka.producer._
import kafka.message._
import kafka.api._
-import kafka.cluster.Broker
-import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig}
-import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
+import kafka.cluster.{Broker, EndPoint}
+import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
+import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
import kafka.common.TopicAndPartition
import kafka.admin.AdminUtils
import kafka.producer.ProducerConfig
import kafka.log._
import kafka.utils.ZkUtils._
-
import org.junit.Assert._
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.network.Mode
@@ -154,11 +152,12 @@ object TestUtils extends Logging {
enablePlaintext: Boolean = true,
enableSsl: Boolean = false,
enableSaslPlaintext: Boolean = false,
- enableSaslSsl: Boolean = false): Seq[Properties] = {
+ enableSaslSsl: Boolean = false,
+ rackInfo: Map[Int, String] = Map()): Seq[Properties] = {
(0 until numConfigs).map { node =>
createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort,
interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = enablePlaintext, enableSsl = enableSsl,
- enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl)
+ enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node))
}
}
@@ -180,7 +179,7 @@ object TestUtils extends Logging {
enablePlaintext: Boolean = true,
enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
enableSsl: Boolean = false, sslPort: Int = RandomPort,
- enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort)
+ enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] = None)
: Properties = {
def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol)
@@ -210,6 +209,7 @@ object TestUtils extends Logging {
props.put("delete.topic.enable", enableDeleteTopic.toString)
props.put("controlled.shutdown.retry.backoff.ms", "100")
props.put("log.cleaner.dedupe.buffer.size", "2097152")
+ rack.foreach(props.put("broker.rack", _))
if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })
props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
@@ -591,9 +591,16 @@ object TestUtils extends Logging {
}
}
- def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = {
- val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
- brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1))
+ def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] =
+ createBrokersInZk(ids.map(kafka.admin.BrokerMetadata(_, None)), zkUtils)
+
+ def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata], zkUtils: ZkUtils): Seq[Broker] = {
+ val brokers = brokerMetadatas.map { b =>
+ val protocol = SecurityProtocol.PLAINTEXT
+ Broker(b.id, Map(protocol -> EndPoint("localhost", 6667, protocol)).toMap, b.rack)
+ }
+ brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1,
+ rack = b.rack, ApiVersion.latestVersion))
brokers
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 15ea3ae..ba3d024 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -21,6 +21,11 @@
0.10.0.0 has <a href="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading) and
there may be a <a href="#upgrade_10_performance_impact">performance impact during the upgrade</a>. Because new protocols
are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
+<p/>
+<b>Notes to clients with version 0.9.0.0: </b>Due to a bug introduced in 0.9.0.0,
+clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not
+work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 <b>before</b> brokers are upgraded to
+0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients.
<p><b>For a rolling upgrade:</b></p>
[2/2] kafka git commit: KAFKA-1215;
Rack-Aware replica assignment option
Posted by ju...@apache.org.
KAFKA-1215; Rack-Aware replica assignment option
Please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment for the overall design.
The update to TopicMetadataRequest/TopicMetadataResponse will be done in a different PR.
Author: Allen Wang <aw...@netflix.com>
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Grant Henke <gr...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #132 from allenxwang/KAFKA-1215
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/951e30ad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/951e30ad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/951e30ad
Branch: refs/heads/trunk
Commit: 951e30adc6d4a0ed37dcc3fde0050ca5faff146d
Parents: deb2b00
Author: Allen Wang <aw...@netflix.com>
Authored: Tue Mar 15 10:03:03 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 15 10:03:03 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/common/protocol/Protocol.java | 22 +-
.../kafka/common/protocol/types/Struct.java | 13 +-
.../common/requests/UpdateMetadataRequest.java | 47 ++--
.../common/requests/RequestResponseTest.java | 43 ++--
.../src/main/scala/kafka/admin/AdminUtils.scala | 235 ++++++++++++++++---
.../main/scala/kafka/admin/BrokerMetadata.scala | 23 ++
.../main/scala/kafka/admin/RackAwareMode.scala | 42 ++++
.../kafka/admin/ReassignPartitionsCommand.scala | 37 ++-
.../main/scala/kafka/admin/TopicCommand.scala | 5 +-
core/src/main/scala/kafka/cluster/Broker.scala | 112 ++++-----
.../controller/ControllerChannelManager.scala | 39 +--
.../src/main/scala/kafka/server/KafkaApis.scala | 4 +-
.../main/scala/kafka/server/KafkaConfig.scala | 10 +
.../scala/kafka/server/KafkaHealthcheck.scala | 13 +-
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../main/scala/kafka/server/MetadataCache.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 74 +++---
.../api/RackAwareAutoTopicCreationTest.scala | 65 +++++
.../unit/kafka/admin/AdminRackAwareTest.scala | 196 ++++++++++++++++
.../test/scala/unit/kafka/admin/AdminTest.scala | 47 +++-
.../scala/unit/kafka/admin/RackAwareTest.scala | 82 +++++++
.../admin/ReassignPartitionsCommandTest.scala | 51 ++++
.../unit/kafka/admin/TopicCommandTest.scala | 32 ++-
.../unit/kafka/cluster/BrokerEndPointTest.scala | 14 --
.../unit/kafka/server/KafkaConfigTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 33 ++-
docs/upgrade.html | 5 +
27 files changed, 1000 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index a77bf8c..e32d0b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -697,8 +697,26 @@ public class Protocol {
public static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0;
- public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1};
- public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1};
+ public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V2 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V1;
+
+ public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V2 = UPDATE_METADATA_REQUEST_END_POINT_V1;
+
+ public static final Schema UPDATE_METADATA_REQUEST_BROKER_V2 =
+ new Schema(new Field("id", INT32, "The broker id."),
+ new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V2)),
+ new Field("rack", NULLABLE_STRING, "The rack"));
+
+ public static final Schema UPDATE_METADATA_REQUEST_V2 =
+ new Schema(new Field("controller_id", INT32, "The controller id."),
+ new Field("controller_epoch", INT32, "The controller epoch."),
+ new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V2)),
+ new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V2)));
+
+ public static final Schema UPDATE_METADATA_RESPONSE_V2 = UPDATE_METADATA_RESPONSE_V1;
+
+
+ public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2};
+ public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2};
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 4902f25..79f0638 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -311,7 +311,10 @@ public class Struct {
for (Object arrayItem: arrayObject)
result = prime * result + arrayItem.hashCode();
} else {
- result = prime * result + this.get(f).hashCode();
+ Object field = this.get(f);
+ if (field != null) {
+ result = prime * result + field.hashCode();
+ }
}
}
return result;
@@ -330,11 +333,13 @@ public class Struct {
return false;
for (int i = 0; i < this.values.length; i++) {
Field f = this.schema.get(i);
- Boolean result;
+ boolean result;
if (f.type() instanceof ArrayOf) {
- result = Arrays.equals((Object []) this.get(f), (Object []) other.get(f));
+ result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
} else {
- result = this.get(f).equals(other.get(f));
+ Object thisField = this.get(f);
+ Object otherField = other.get(f);
+ result = (thisField == null && otherField == null) || thisField.equals(otherField);
}
if (!result)
return false;
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index d8d8013..4c3d0a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -49,16 +49,22 @@ public class UpdateMetadataRequest extends AbstractRequest {
this.zkVersion = zkVersion;
this.replicas = replicas;
}
-
}
public static final class Broker {
public final int id;
public final Map<SecurityProtocol, EndPoint> endPoints;
+ public final String rack;
- public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
+ public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints, String rack) {
this.id = id;
this.endPoints = endPoints;
+ this.rack = rack;
+ }
+
+ @Deprecated
+ public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
+ this(id, endPoints, null);
}
}
@@ -91,6 +97,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
// Broker key names
private static final String BROKER_ID_KEY_NAME = "id";
private static final String ENDPOINTS_KEY_NAME = "end_points";
+ private static final String RACK_KEY_NAME = "rack";
// EndPoint key names
private static final String HOST_KEY_NAME = "host";
@@ -117,20 +124,20 @@ public class UpdateMetadataRequest extends AbstractRequest {
for (BrokerEndPoint brokerEndPoint : brokerEndPoints) {
Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
new EndPoint(brokerEndPoint.host(), brokerEndPoint.port()));
- brokers.add(new Broker(brokerEndPoint.id(), endPoints));
+ brokers.add(new Broker(brokerEndPoint.id(), endPoints, null));
}
return brokers;
}
/**
- * Constructor for version 1.
+ * Constructor for version 2.
*/
public UpdateMetadataRequest(int controllerId, int controllerEpoch, Map<TopicPartition,
PartitionState> partitionStates, Set<Broker> liveBrokers) {
- this(1, controllerId, controllerEpoch, partitionStates, liveBrokers);
+ this(2, controllerId, controllerEpoch, partitionStates, liveBrokers);
}
- private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
+ public UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
PartitionState> partitionStates, Set<Broker> liveBrokers) {
super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)));
struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
@@ -173,6 +180,9 @@ public class UpdateMetadataRequest extends AbstractRequest {
}
brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray());
+ if (version >= 2) {
+ brokerData.set(RACK_KEY_NAME, broker.rack);
+ }
}
brokersData.add(brokerData);
@@ -226,8 +236,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
int port = brokerData.getInt(PORT_KEY_NAME);
Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(1);
endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, port));
- liveBrokers.add(new Broker(brokerId, endPoints));
- } else { // V1
+ liveBrokers.add(new Broker(brokerId, endPoints, null));
+ } else { // V1 or V2
Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>();
for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) {
Struct endPointData = (Struct) endPointDataObj;
@@ -236,11 +246,13 @@ public class UpdateMetadataRequest extends AbstractRequest {
short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME);
endPoints.put(SecurityProtocol.forId(protocolTypeId), new EndPoint(host, port));
}
- liveBrokers.add(new Broker(brokerId, endPoints));
+ String rack = null;
+ if (brokerData.hasField(RACK_KEY_NAME)) { // V2
+ rack = brokerData.getString(RACK_KEY_NAME);
+ }
+ liveBrokers.add(new Broker(brokerId, endPoints, rack));
}
-
}
-
controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
this.partitionStates = partitionStates;
@@ -249,14 +261,11 @@ public class UpdateMetadataRequest extends AbstractRequest {
@Override
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- switch (versionId) {
- case 0:
- case 1:
- return new UpdateMetadataResponse(Errors.forException(e).code());
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
- }
+ if (versionId <= 2)
+ return new UpdateMetadataResponse(Errors.forException(e).code());
+ else
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
}
public int controllerId() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7ccf079..b556b46 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -86,8 +86,9 @@ public class RequestResponseTest {
createStopReplicaRequest(),
createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
createStopReplicaResponse(),
- createUpdateMetadataRequest(1),
- createUpdateMetadataRequest(1).getErrorResponse(1, new UnknownServerException()),
+ createUpdateMetadataRequest(2, "rack1"),
+ createUpdateMetadataRequest(2, null),
+ createUpdateMetadataRequest(2, "rack1").getErrorResponse(2, new UnknownServerException()),
createUpdateMetadataResponse(),
createLeaderAndIsrRequest(),
createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()),
@@ -97,8 +98,11 @@ public class RequestResponseTest {
for (AbstractRequestResponse req : requestResponseList)
checkSerialization(req, null);
- checkSerialization(createUpdateMetadataRequest(0), 0);
- checkSerialization(createUpdateMetadataRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
+ checkSerialization(createUpdateMetadataRequest(0, null), 0);
+ checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(0, new UnknownServerException()), 0);
+ checkSerialization(createUpdateMetadataRequest(1, null), 1);
+ checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1);
+ checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(1, new UnknownServerException()), 1);
}
private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception {
@@ -120,7 +124,7 @@ public class RequestResponseTest {
@Test
public void produceResponseVersionTest() {
- Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
ProduceResponse v0Response = new ProduceResponse(responseData);
ProduceResponse v1Response = new ProduceResponse(responseData, 10, 1);
@@ -138,7 +142,7 @@ public class RequestResponseTest {
@Test
public void fetchResponseVersionTest() {
- Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+ Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
FetchResponse v0Response = new FetchResponse(responseData);
@@ -192,14 +196,14 @@ public class RequestResponseTest {
}
private AbstractRequest createFetchRequest() {
- Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+ Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<>();
fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
return new FetchRequest(-1, 100, 100000, fetchData);
}
private AbstractRequestResponse createFetchResponse() {
- Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
+ Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
return new FetchResponse(responseData, 0);
}
@@ -259,13 +263,13 @@ public class RequestResponseTest {
}
private AbstractRequest createListOffsetRequest() {
- Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
+ Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>();
offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
return new ListOffsetRequest(-1, offsetData);
}
private AbstractRequestResponse createListOffsetResponse() {
- Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
+ Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
return new ListOffsetResponse(responseData);
}
@@ -289,13 +293,13 @@ public class RequestResponseTest {
}
private AbstractRequest createOffsetCommitRequest() {
- Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>();
+ Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, ""));
return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData);
}
private AbstractRequestResponse createOffsetCommitResponse() {
- Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
+ Map<TopicPartition, Short> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
return new OffsetCommitResponse(responseData);
}
@@ -305,19 +309,19 @@ public class RequestResponseTest {
}
private AbstractRequestResponse createOffsetFetchResponse() {
- Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
+ Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
return new OffsetFetchResponse(responseData);
}
private AbstractRequest createProduceRequest() {
- Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>();
+ Map<TopicPartition, ByteBuffer> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
return new ProduceRequest((short) 1, 5000, produceData);
}
private AbstractRequestResponse createProduceResponse() {
- Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
return new ProduceResponse(responseData, 0);
}
@@ -371,7 +375,7 @@ public class RequestResponseTest {
}
@SuppressWarnings("deprecation")
- private AbstractRequest createUpdateMetadataRequest(int version) {
+ private AbstractRequest createUpdateMetadataRequest(int version, String rack) {
Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
@@ -397,11 +401,10 @@ public class RequestResponseTest {
endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244));
endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234));
- Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1),
- new UpdateMetadataRequest.Broker(1, endPoints2)
+ Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack),
+ new UpdateMetadataRequest.Broker(1, endPoints2, rack)
));
-
- return new UpdateMetadataRequest(1, 10, partitionStates, liveBrokers);
+ return new UpdateMetadataRequest(version, 1, 10, partitionStates, liveBrokers);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 3fb44d3..24174be 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -19,7 +19,6 @@ package kafka.admin
import kafka.common._
import kafka.cluster.Broker
-
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils._
@@ -32,14 +31,12 @@ import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopi
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.MetadataResponse
-import scala.Predef._
import scala.collection._
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import JavaConverters._
import mutable.ListBuffer
+import scala.collection.mutable
import collection.Map
import collection.Set
-
import org.I0Itec.zkclient.exception.ZkNodeExistsException
object AdminUtils extends Logging {
@@ -48,11 +45,13 @@ object AdminUtils extends Logging {
val EntityConfigChangeZnodePrefix = "config_change_"
/**
- * There are 2 goals of replica assignment:
+ * There are 3 goals of replica assignment:
+ *
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
+ * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
*
- * To achieve this goal, we:
+ * To achieve this goal for replica assignment without considering racks, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
*
@@ -64,39 +63,177 @@ object AdminUtils extends Logging {
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
+ *
+ * To create rack aware assignment, this API will first create a rack alternated broker list. For example,
+ * from this brokerID -> rack mapping:
+ *
+ * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
+ *
+ * The rack alternated list will be:
+ *
+ * 0, 3, 1, 5, 4, 2
+ *
+ * Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment
+ * will be:
+ *
+ * 0 -> 0,3,1
+ * 1 -> 3,1,5
+ * 2 -> 1,5,4
+ * 3 -> 5,4,2
+ * 4 -> 4,2,0
+ * 5 -> 2,0,3
+ *
+ * Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start
+ * shifting the followers. This is to ensure we will not always get the same set of sequences.
+ * In this case, if there is another partition to assign (partition #6), the assignment will be:
+ *
+ * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
+ *
+ * The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated
+ * broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have
+ * any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on
+ * the broker list.
+ *
+ * As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that
+ * each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect
+ * situation where the number of replicas is the same as the number of racks and each rack has the same number of
+ * brokers, it guarantees that the replica distribution is even across brokers and racks.
+ *
+ * @return a Map from partition id to replica ids
+ * @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to
+ * assign each replica to a unique rack.
+ *
*/
- def assignReplicasToBrokers(brokerList: Seq[Int],
+ def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
- startPartitionId: Int = -1)
- : Map[Int, Seq[Int]] = {
+ startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdminOperationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
- if (replicationFactor > brokerList.size)
- throw new AdminOperationException("replication factor: " + replicationFactor +
- " larger than available brokers: " + brokerList.size)
- val ret = new mutable.HashMap[Int, List[Int]]()
- val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
- var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
+ if (replicationFactor > brokerMetadatas.size)
+ throw new AdminOperationException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
+ if (brokerMetadatas.forall(_.rack.isEmpty))
+ assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
+ startPartitionId)
+ else {
+ if (brokerMetadatas.exists(_.rack.isEmpty))
+ throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
+ assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
+ startPartitionId)
+ }
+ }
- var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
- for (i <- 0 until nPartitions) {
- if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
+ private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
+ replicationFactor: Int,
+ brokerList: Seq[Int],
+ fixedStartIndex: Int,
+ startPartitionId: Int): Map[Int, Seq[Int]] = {
+ val ret = mutable.Map[Int, Seq[Int]]()
+ val brokerArray = brokerList.toArray
+ val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
+ var currentPartitionId = math.max(0, startPartitionId)
+ var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
+ for (_ <- 0 until nPartitions) {
+ if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
- val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
- var replicaList = List(brokerList(firstReplicaIndex))
+ val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
+ val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
- replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
- ret.put(currentPartitionId, replicaList.reverse)
- currentPartitionId = currentPartitionId + 1
+ replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
+ ret.put(currentPartitionId, replicaBuffer)
+ currentPartitionId += 1
}
- ret.toMap
+ ret
+ }
+
+ private def assignReplicasToBrokersRackAware(nPartitions: Int,
+ replicationFactor: Int,
+ brokerMetadatas: Seq[BrokerMetadata],
+ fixedStartIndex: Int,
+ startPartitionId: Int): Map[Int, Seq[Int]] = {
+ val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
+ id -> rack
+ }.toMap
+ val numRacks = brokerRackMap.values.toSet.size
+ val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
+ val numBrokers = arrangedBrokerList.size
+ val ret = mutable.Map[Int, Seq[Int]]()
+ val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
+ var currentPartitionId = math.max(0, startPartitionId)
+ var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
+ for (_ <- 0 until nPartitions) {
+ if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
+ nextReplicaShift += 1
+ val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
+ val leader = arrangedBrokerList(firstReplicaIndex)
+ val replicaBuffer = mutable.ArrayBuffer(leader)
+ val racksWithReplicas = mutable.Set(brokerRackMap(leader))
+ val brokersWithReplicas = mutable.Set(leader)
+ var k = 0
+ for (_ <- 0 until replicationFactor - 1) {
+ var done = false
+ while (!done) {
+ val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
+ val rack = brokerRackMap(broker)
+ // Skip this broker if
+ // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
+ // that do not have any replica, or
+ // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
+ if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
+ && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
+ replicaBuffer += broker
+ racksWithReplicas += rack
+ brokersWithReplicas += broker
+ done = true
+ }
+ k += 1
+ }
+ }
+ ret.put(currentPartitionId, replicaBuffer)
+ currentPartitionId += 1
+ }
+ ret
}
+ /**
+ * Given broker and rack information, returns a list of brokers alternated by the rack. Assume
+ * this is the rack and its brokers:
+ *
+ * rack1: 0, 1, 2
+ * rack2: 3, 4, 5
+ * rack3: 6, 7, 8
+ *
+ * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
+ *
+ * This is essential to make sure that the assignReplicasToBrokers API can use such list and
+ * assign replicas to brokers in a simple round-robin fashion, while ensuring an even
+ * distribution of leader and replica counts on each broker and that replicas are
+ * distributed to all racks.
+ */
+ private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = {
+ val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) =>
+ (rack, brokers.toIterator)
+ }
+ val racks = brokersIteratorByRack.keys.toArray.sorted
+ val result = new mutable.ArrayBuffer[Int]
+ var rackIndex = 0
+ while (result.size < brokerRackMap.size) {
+ val rackIterator = brokersIteratorByRack(racks(rackIndex))
+ if (rackIterator.hasNext)
+ result += rackIterator.next()
+ rackIndex = (rackIndex + 1) % racks.length
+ }
+ result
+ }
+ private[admin] def getInverseMap(brokerRackMap: Map[Int, String]): Map[String, Seq[Int]] = {
+ brokerRackMap.toSeq.map { case (id, rack) => (rack, id) }
+ .groupBy { case (rack, _) => rack }
+ .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) }
+ }
/**
* Add partitions to existing topic with optional replica assignment
*
@@ -110,7 +247,8 @@ object AdminUtils extends Logging {
topic: String,
numPartitions: Int = 1,
replicaAssignmentStr: String = "",
- checkBrokerAvailable: Boolean = true) {
+ checkBrokerAvailable: Boolean = true,
+ rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -124,16 +262,16 @@ object AdminUtils extends Logging {
throw new AdminOperationException("The number of partitions for a topic can only be increased")
// create the new partition replication list
- val brokerList = zkUtils.getSortedBrokerList()
- val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
- var startIndex = brokerList.indexWhere(_ >= existingReplicaListForPartitionZero.head)
- if(startIndex < 0) {
- startIndex = 0
+ val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
+ val newPartitionReplicaList =
+ if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
+ val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingReplicaListForPartitionZero.head))
+ AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingReplicaListForPartitionZero.size,
+ startIndex, existingPartitionsReplicaList.size)
}
- AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaListForPartitionZero.size, startIndex, existingPartitionsReplicaList.size)
- }
- else
- getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable)
+ else
+ getManualReplicaAssignment(replicaAssignmentStr, brokerMetadatas.map(_.id).toSet,
+ existingPartitionsReplicaList.size, checkBrokerAvailable)
// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
@@ -237,13 +375,32 @@ object AdminUtils extends Logging {
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.zkClient.exists(getTopicPath(topic))
+ def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
+ brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
+ val allBrokers = zkUtils.getAllBrokersInCluster()
+ val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
+ val brokersWithRack = brokers.filter(_.rack.nonEmpty)
+ if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
+ throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
+ " to make replica assignment without rack information.")
+ }
+ val brokerMetadatas = rackAwareMode match {
+ case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
+ case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
+ brokers.map(broker => BrokerMetadata(broker.id, None))
+ case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+ }
+ brokerMetadatas.sortBy(_.id)
+ }
+
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
- topicConfig: Properties = new Properties) {
- val brokerList = zkUtils.getSortedBrokerList()
- val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
+ topicConfig: Properties = new Properties,
+ rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
+ val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
+ val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
@@ -304,6 +461,7 @@ object AdminUtils extends Logging {
/**
* Update the config for a client and create a change notification so the change will propagate to other brokers
+ *
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param clientId: The clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
@@ -316,6 +474,7 @@ object AdminUtils extends Logging {
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+ *
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param topic: The topic for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/BrokerMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/BrokerMetadata.scala b/core/src/main/scala/kafka/admin/BrokerMetadata.scala
new file mode 100644
index 0000000..86831e3
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/BrokerMetadata.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package kafka.admin
+
+/**
+ * Broker metadata used by admin tools.
+ *
+ * @param id an integer that uniquely identifies this broker
+ * @param rack the rack of the broker, which is used to in rack aware partition assignment for fault tolerance.
+ * Examples: "RACK1", "us-east-1d"
+ */
+case class BrokerMetadata(id: Int, rack: Option[String])
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/RackAwareMode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/RackAwareMode.scala b/core/src/main/scala/kafka/admin/RackAwareMode.scala
new file mode 100644
index 0000000..45555b6
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/RackAwareMode.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+/**
+ * Mode to control how rack aware replica assignment will be executed
+ */
+object RackAwareMode {
+
+ /**
+ * Ignore all rack information in replica assignment. This is an optional mode used in command line.
+ */
+ case object Disabled extends RackAwareMode
+
+ /**
+ * Assume every broker has rack, or none of the brokers has rack. If only partial brokers have rack, fail fast
+ * in replica assignment. This is the default mode in command line tools (TopicCommand and ReassignPartitionsCommand).
+ */
+ case object Enforced extends RackAwareMode
+
+ /**
+ * Use rack information if every broker has a rack. Otherwise, fallback to Disabled mode. This is used in auto topic
+ * creation.
+ */
+ case object Safe extends RackAwareMode
+}
+
+sealed trait RackAwareMode
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 13e423d..446ab9f 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -91,23 +91,33 @@ object ReassignPartitionsCommand extends Logging {
if (duplicateReassignments.nonEmpty)
throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
+ val disableRackAware = opts.options.has(opts.disableRackAware)
+ val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
+ println("Current partition replica assignment\n\n%s".format(zkUtils.getPartitionReassignmentZkData(currentAssignments)))
+ println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(proposedAssignments)))
+ }
+
+ def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString)
val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
if (duplicateTopicsToReassign.nonEmpty)
throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
- val topicPartitionsToReassign = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
-
- var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
- val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
- groupedByTopic.foreach { topicInfo =>
- val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
- topicInfo._2.head._2.size)
- partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
+ val currentAssignment = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
+
+ val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic }
+ val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
+ val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, rackAwareMode, Some(brokerListToReassign))
+
+ val partitionsToBeReassigned = mutable.Map[TopicAndPartition, Seq[Int]]()
+ groupedByTopic.foreach { case (topic, assignment) =>
+ val (_, replicas) = assignment.head
+ val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
+ partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
+ (TopicAndPartition(topic, partition) -> replicas)
+ }
}
- val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic).toSeq)
- println("Current partition replica assignment\n\n%s"
- .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
- println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
+
+ (partitionsToBeReassigned, currentAssignment)
}
def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
@@ -200,7 +210,8 @@ object ReassignPartitionsCommand extends Logging {
.withRequiredArg
.describedAs("brokerlist")
.ofType(classOf[String])
-
+ val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
+
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index d4212c5..e89e09d 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -105,7 +105,9 @@ object TopicCommand extends Logging {
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
warnOnMaxMessagesChange(configs, replicas)
- AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
+ val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
+ else RackAwareMode.Enforced
+ AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
@@ -324,6 +326,7 @@ object TopicCommand extends Logging {
val ifNotExistsOpt = parser.accepts("if-not-exists",
"if set when creating topics, the action will only execute if the topic does not already exist")
+ val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
val options = parser.parse(args : _*)
val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 7340f14..77b85e0 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -19,6 +19,7 @@ package kafka.cluster
import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
import kafka.utils.Json
import org.apache.kafka.common.Node
@@ -32,26 +33,41 @@ import org.apache.kafka.common.protocol.SecurityProtocol
object Broker {
/**
- * Create a broker object from id and JSON string.
- * @param id
- * @param brokerInfoString
- *
- * Version 1 JSON schema for a broker is:
- * {"version":1,
- * "host":"localhost",
- * "port":9092
- * "jmx_port":9999,
- * "timestamp":"2233345666" }
- *
- * The current JSON schema for a broker is:
- * {"version":2,
- * "host","localhost",
- * "port",9092
- * "jmx_port":9999,
- * "timestamp":"2233345666",
- * "endpoints": ["PLAINTEXT://host1:9092",
- * "SSL://host1:9093"]
- */
+ * Create a broker object from id and JSON string.
+ *
+ * @param id
+ * @param brokerInfoString
+ *
+ * Version 1 JSON schema for a broker is:
+ * {
+ * "version":1,
+ * "host":"localhost",
+ * "port":9092
+ * "jmx_port":9999,
+ * "timestamp":"2233345666"
+ * }
+ *
+ * Version 2 JSON schema for a broker is:
+ * {
+ * "version":2,
+ * "host":"localhost",
+ * "port":9092
+ * "jmx_port":9999,
+ * "timestamp":"2233345666",
+ * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
+ * }
+ *
+ * Version 3 (current) JSON schema for a broker is:
+ * {
+ * "version":3,
+ * "host":"localhost",
+ * "port":9092
+ * "jmx_port":9999,
+ * "timestamp":"2233345666",
+ * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
+ * "rack":"dc1"
+ * }
+ */
def createBroker(id: Int, brokerInfoString: String): Broker = {
if (brokerInfoString == null)
throw new BrokerNotAvailableException(s"Broker id $id does not exist")
@@ -75,9 +91,8 @@ object Broker {
(ep.protocolType, ep)
}.toMap
}
-
-
- new Broker(id, endpoints)
+ val rack = brokerInfo.get("rack").filter(_ != null).map(_.asInstanceOf[String])
+ new Broker(id, endpoints, rack)
case None =>
throw new BrokerNotAvailableException(s"Broker id $id does not exist")
}
@@ -86,61 +101,34 @@ object Broker {
throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
}
}
-
- /**
- *
- * @param buffer Containing serialized broker.
- * Current serialization is:
- * id (int), number of endpoints (int), serialized endpoints
- * @return broker object
- */
- def readFrom(buffer: ByteBuffer): Broker = {
- val id = buffer.getInt
- val numEndpoints = buffer.getInt
-
- val endpoints = List.range(0, numEndpoints).map(i => EndPoint.readFrom(buffer))
- .map(ep => ep.protocolType -> ep).toMap
- new Broker(id, endpoints)
- }
}
-case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint]) {
+case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint], rack: Option[String]) {
- override def toString: String = id + " : " + endPoints.values.mkString("(",",",")")
+ override def toString: String =
+ s"$id : ${endPoints.values.mkString("(",",",")")} : ${rack.orNull}"
+
+ def this(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) = {
+ this(id, endPoints, None)
+ }
def this(id: Int, host: String, port: Int, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
- this(id, Map(protocol -> EndPoint(host, port, protocol)))
+ this(id, Map(protocol -> EndPoint(host, port, protocol)), None)
}
def this(bep: BrokerEndPoint, protocol: SecurityProtocol) = {
this(bep.id, bep.host, bep.port, protocol)
}
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(id)
- buffer.putInt(endPoints.size)
- for(endpoint <- endPoints.values) {
- endpoint.writeTo(buffer)
- }
- }
-
- def sizeInBytes: Int =
- 4 + /* broker id*/
- 4 + /* number of endPoints */
- endPoints.values.map(_.sizeInBytes).sum /* end points */
-
- def supportsChannel(protocolType: SecurityProtocol): Unit = {
- endPoints.contains(protocolType)
- }
-
def getNode(protocolType: SecurityProtocol): Node = {
- val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)))
+ val endpoint = endPoints.getOrElse(protocolType,
+ throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id"))
new Node(id, endpoint.host, endpoint.port)
}
def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = {
- val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)))
+ val endpoint = endPoints.getOrElse(protocolType,
+ throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id"))
new BrokerEndPoint(id, endpoint.host, endpoint.port)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3b1a458..ea156fa 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -16,24 +16,25 @@
*/
package kafka.controller
-import kafka.api.{LeaderAndIsr, KAFKA_0_9_0, PartitionStateInfo}
+import java.net.SocketTimeoutException
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+
+import kafka.api._
+import kafka.cluster.Broker
+import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.server.KafkaConfig
import kafka.utils._
-import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, Node}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode}
-import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys}
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
import org.apache.kafka.common.utils.Time
-import collection.mutable.HashMap
-import kafka.cluster.Broker
-import java.net.{SocketTimeoutException}
-import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
-import kafka.server.KafkaConfig
-import collection.mutable
-import kafka.common.{KafkaException, TopicAndPartition}
-import collection.Set
-import collection.JavaConverters._
+import org.apache.kafka.common.{BrokerEndPoint, Node, TopicPartition}
+
+import scala.collection.JavaConverters._
+import scala.collection.{Set, mutable}
+import scala.collection.mutable.HashMap
class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@@ -380,7 +381,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
topicPartition -> partitionState
}
- val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) (1: Short) else (0: Short)
+ val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2: Short
+ else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
+ else 0: Short
val updateMetadataRequest =
if (version == 0) {
@@ -395,9 +398,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
}
- new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava)
+ new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
}
- new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
+ new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
}
controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5f9ec8b..452f721 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import java.lang.{Long => JLong, Short => JShort}
import java.util.Properties
-import kafka.admin.AdminUtils
+import kafka.admin.{RackAwareMode, AdminUtils}
import kafka.api._
import kafka.cluster.Partition
import kafka.common._
@@ -624,7 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
try {
- AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties)
+ AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8d14edd..9c24876 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -222,6 +222,8 @@ object KafkaConfig {
val MaxConnectionsPerIpProp = "max.connections.per.ip"
val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
+ /***************** rack configuration *************/
+ val RackProp = "broker.rack"
/** ********* Log Configuration ***********/
val NumPartitionsProp = "num.partitions"
val LogDirsProp = "log.dirs"
@@ -388,6 +390,8 @@ object KafkaConfig {
val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address"
val MaxConnectionsPerIpOverridesDoc = "Per-ip or hostname overrides to the default maximum number of connections"
val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this"
+ /************* Rack Configuration **************/
+ val RackDoc = "Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: `RACK1`, `us-east-1d`"
/** ********* Log Configuration ***********/
val NumPartitionsDoc = "The default number of log partitions per topic"
val LogDirDoc = "The directory in which the log data is kept (supplemental for " + LogDirsProp + " property)"
@@ -571,6 +575,9 @@ object KafkaConfig {
.define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
.define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc)
+ /************ Rack Configuration ******************/
+ .define(RackProp, STRING, null, MEDIUM, RackDoc)
+
/** ********* Log Configuration ***********/
.define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc)
.define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc)
@@ -771,6 +778,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
+ /***************** rack configuration **************/
+ val rack = Option(getString(KafkaConfig.RackProp))
+
/** ********* Log Configuration ***********/
val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 928ff43..2598e6d 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -17,12 +17,14 @@
package kafka.server
+import java.net.InetAddress
+
+import kafka.api.ApiVersion
import kafka.cluster.EndPoint
import kafka.utils._
+import org.I0Itec.zkclient.IZkStateListener
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient, ZkConnection}
-import java.net.InetAddress
/**
@@ -35,7 +37,9 @@ import java.net.InetAddress
*/
class KafkaHealthcheck(private val brokerId: Int,
private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
- private val zkUtils: ZkUtils) extends Logging {
+ private val zkUtils: ZkUtils,
+ private val rack: Option[String],
+ private val interBrokerProtocolVersion: ApiVersion) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
@@ -61,7 +65,8 @@ class KafkaHealthcheck(private val brokerId: Int,
// only PLAINTEXT is supported as default
// if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
- zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
+ zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack,
+ interBrokerProtocolVersion)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2f5441a..e29494b 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -239,7 +239,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
else
(protocol, endpoint)
}
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 1fdd717..6df261c 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -159,7 +159,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol))
nodes.put(protocol, new Node(broker.id, ep.host, ep.port))
}
- aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala)
+ aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala, Option(broker.rack))
aliveNodes(broker.id) = nodes.asScala
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f39ed01..99c8196 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -18,31 +18,26 @@
package kafka.utils
import java.util.concurrent.CountDownLatch
+
+import kafka.admin._
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0, LeaderAndIsr}
import kafka.cluster._
+import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
import kafka.consumer.{ConsumerThreadId, TopicCount}
+import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
import kafka.server.ConfigType
-import org.I0Itec.zkclient.{ZkClient,ZkConnection}
-import org.I0Itec.zkclient.exception.{ZkException, ZkNodeExistsException, ZkNoNodeException,
- ZkMarshallingError, ZkBadVersionException}
+import kafka.utils.ZkUtils._
+import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
import org.I0Itec.zkclient.serialize.ZkSerializer
+import org.I0Itec.zkclient.{ZkClient, ZkConnection}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.zookeeper.ZooDefs
-import scala.collection._
-import kafka.api.LeaderAndIsr
-import org.apache.zookeeper.data.{ACL, Stat}
-import kafka.admin._
-import kafka.common.{KafkaException, NoEpochForPartitionException}
-import kafka.controller.ReassignedPartitionsContext
-import kafka.controller.KafkaController
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.common.TopicAndPartition
-import kafka.utils.ZkUtils._
-import org.apache.zookeeper.AsyncCallback.{DataCallback,StringCallback}
-import org.apache.zookeeper.CreateMode
-import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.ZooKeeper
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
+
+import scala.collection._
object ZkUtils {
val ConsumersPath = "/consumers"
@@ -256,19 +251,43 @@ class ZkUtils(val zkClient: ZkClient,
}
/**
- * Register brokers with v2 json format (which includes multiple endpoints).
+ * Register brokers with v3 json format (which includes multiple endpoints and rack) if
+ * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise.
+ * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2.
+ * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X without having to upgrade
+ * to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case).
+ *
* This format also includes default endpoints for compatibility with older clients.
- * @param id
- * @param host
- * @param port
- * @param advertisedEndpoints
- * @param jmxPort
+ *
+ * @param id broker ID
+ * @param host broker host name
+ * @param port broker port
+ * @param advertisedEndpoints broker end points
+ * @param jmxPort jmx port
+ * @param rack broker rack
+ * @param apiVersion Kafka version the broker is running as
*/
- def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
+ def registerBrokerInZk(id: Int,
+ host: String,
+ port: Int,
+ advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint],
+ jmxPort: Int,
+ rack: Option[String],
+ apiVersion: ApiVersion) {
val brokerIdPath = BrokerIdsPath + "/" + id
val timestamp = SystemTime.milliseconds.toString
- val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
+ val version = if (apiVersion >= KAFKA_0_10_0_IV0) 3 else 2
+ var jsonMap = Map("version" -> version,
+ "host" -> host,
+ "port" -> port,
+ "endpoints" -> advertisedEndpoints.values.map(_.connectionString).toArray,
+ "jmx_port" -> jmxPort,
+ "timestamp" -> timestamp
+ )
+ rack.foreach(rack => if (version >= 3) jsonMap += ("rack" -> rack))
+
+ val brokerInfo = Json.encode(jsonMap)
registerBrokerInZk(brokerIdPath, brokerInfo)
info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
@@ -745,6 +764,7 @@ class ZkUtils(val zkClient: ZkClient,
/**
* This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
* or throws an exception if the broker dies before the query to zookeeper finishes
+ *
* @param brokerId The broker id
* @return An optional Broker object encapsulating the broker metadata
*/
@@ -768,7 +788,6 @@ class ZkUtils(val zkClient: ZkClient,
case e: ZkNoNodeException => {
createParentPath(BrokerSequenceIdPath, acls)
try {
- import scala.collection.JavaConversions._
zkClient.createPersistent(BrokerSequenceIdPath, "", acls)
0
} catch {
@@ -880,7 +899,6 @@ class ZKConfig(props: VerifiableProperties) {
object ZkPath {
@volatile private var isNamespacePresent: Boolean = false
- import scala.collection.JavaConversions._
def checkNamespace(client: ZkClient) {
if(isNamespacePresent)
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
new file mode 100644
index 0000000..a2f2041
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Properties
+
+import kafka.admin.{RackAwareMode, AdminUtils, RackAwareTest}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.junit.Assert._
+import org.junit.Test
+import scala.collection.Map
+
+class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest {
+ val numServers = 4
+ val numPartitions = 8
+ val replicationFactor = 2
+ val overridingProps = new Properties()
+ overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
+ overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString)
+
+ def generateConfigs() =
+ (0 until numServers) map { node =>
+ TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString))
+ } map (KafkaConfig.fromProps(_, overridingProps))
+
+ private val topic = "topic"
+
+ @Test
+ def testAutoCreateTopic() {
+ val producer = TestUtils.createNewProducer(brokerList, retries = 5)
+ try {
+ // Send a message to auto-create the topic
+ val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+ assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
+
+ // double check that the topic is created with leader elected
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+ val assignment = zkUtils.getReplicaAssignmentForTopics(Seq(topic)).map { case (topicPartition, replicas) =>
+ topicPartition.partition -> replicas
+ }
+ val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+ val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
+ assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
+ checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor)
+ } finally producer.close()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
new file mode 100644
index 0000000..27ff4d4
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.{Map, Seq}
+
+class AdminRackAwareTest extends RackAwareTest with Logging {
+
+ @Test
+ def testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() {
+ val rackMap = Map(0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1")
+ val newList = AdminUtils.getRackAlternatedBrokerList(rackMap)
+ assertEquals(List(0, 3, 1, 5, 4, 2), newList)
+ val anotherList = AdminUtils.getRackAlternatedBrokerList(rackMap - 5)
+ assertEquals(List(0, 3, 1, 4, 2), anotherList)
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0)
+ val expected = Map(0 -> List(0, 3, 1),
+ 1 -> List(3, 1, 5),
+ 2 -> List(1, 5, 4),
+ 3 -> List(5, 4, 2),
+ 4 -> List(4, 2, 0),
+ 5 -> List(2, 0, 3),
+ 6 -> List(0, 4, 2))
+ assertEquals(expected, assignment)
+ }
+
+ @Test
+ def testAssignmentWithRackAware() {
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+ val numPartitions = 6
+ val replicationFactor = 3
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor, 2, 0)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+ replicationFactor)
+ }
+
+ @Test
+ def testAssignmentWithRackAwareWithRandomStartIndex() {
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+ val numPartitions = 6
+ val replicationFactor = 3
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+ replicationFactor)
+ }
+
+ @Test
+ def testAssignmentWithRackAwareWithUnevenReplicas() {
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+ val numPartitions = 13
+ val replicationFactor = 3
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor, 0, 0)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+ replicationFactor, verifyLeaderDistribution = false, verifyReplicasDistribution = false)
+ }
+
+ @Test
+ def testAssignmentWithRackAwareWithUnevenRacks() {
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+ val numPartitions = 12
+ val replicationFactor = 3
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+ replicationFactor, verifyReplicasDistribution = false)
+ }
+
+ @Test
+ def testAssignmentWith2ReplicasRackAware() {
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+ val numPartitions = 12
+ val replicationFactor = 2
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+ replicationFactor)
+ }
+
+ @Test
+ def testRackAwareExpansion() {
+ val brokerRackMapping = Map(6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack3", 11 -> "rack1")
+ val numPartitions = 12
+ val replicationFactor = 2
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor, startPartitionId = 12)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+ replicationFactor)
+ }
+
+ @Test
+ def testAssignmentWith2ReplicasRackAwareWith6Partitions() {
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
+ val numPartitions = 6
+ val replicationFactor = 2
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+ replicationFactor)
+ }
+
+ @Test
+ def testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() {
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 4 -> "rack3")
+ val numPartitions = 3
+ val replicationFactor = 2
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, replicationFactor)
+ }
+
+ @Test
+ def testLargeNumberPartitionsAssignment() {
+ val numPartitions = 96
+ val replicationFactor = 3
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1",
+ 6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack1", 11 -> "rack3")
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor)
+ checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
+ replicationFactor)
+ }
+
+ @Test
+ def testMoreReplicasThanRacks() {
+ val numPartitions = 6
+ val replicationFactor = 5
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
+ assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+ val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+ for (partition <- 0 until numPartitions)
+ assertEquals(3, distribution.partitionRacks(partition).toSet.size)
+ }
+
+ @Test
+ def testLessReplicasThanRacks() {
+ val numPartitions = 6
+ val replicationFactor = 2
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
+ replicationFactor)
+ assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+ val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+ for (partition <- 0 to 5)
+ assertEquals(2, distribution.partitionRacks(partition).toSet.size)
+ }
+
+ @Test
+ def testSingleRack() {
+ val numPartitions = 6
+ val replicationFactor = 3
+ val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 -> "rack1", 4 -> "rack1", 5 -> "rack1")
+ val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
+ assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+ val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+ for (partition <- 0 until numPartitions)
+ assertEquals(1, distribution.partitionRacks(partition).toSet.size)
+ for (broker <- brokerRackMapping.keys)
+ assertEquals(1, distribution.brokerLeaderCount(broker))
+ }
+
+ @Test
+ def testSkipBrokerWithReplicaAlreadyAssigned() {
+ val rackInfo = Map(0 -> "a", 1 -> "b", 2 -> "c", 3 -> "a", 4 -> "a")
+ val brokerList = 0 to 4
+ val numPartitions = 6
+ val replicationFactor = 4
+ val brokerMetadatas = toBrokerMetadata(rackInfo)
+ assertEquals(brokerList, brokerMetadatas.map(_.id))
+ val assignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, numPartitions, replicationFactor,
+ fixedStartIndex = 2)
+ checkReplicaDistribution(assignment, rackInfo, 5, 6, 4,
+ verifyRackAware = false, verifyLeaderDistribution = false, verifyReplicasDistribution = false)
+ }
+}