You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/19 07:39:07 UTC

[kylin] branch master updated: KYLIN-2933 fix compilation against the Kafka 1.0.0 release

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 27eb8cd  KYLIN-2933 fix compilation against the Kafka 1.0.0 release
27eb8cd is described below

commit 27eb8cd2b8be7cfbb4345f3bef782587248cce18
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Tue Jul 17 11:03:18 2018 +0800

    KYLIN-2933 fix compilation against the Kafka 1.0.0 release
    
    kafka upgrade
    
    kafka upgrade again
---
 kylin-it/pom.xml                                   |  41 ++
 .../src/test/scala/kafka/admin/AdminUtils.scala    | 788 +++++++++++++++++++++
 pom.xml                                            |   2 +-
 3 files changed, 830 insertions(+), 1 deletion(-)

diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 3e528c1..16bedb5 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -303,8 +303,49 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.11.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>2.11.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>2.11.0</version>
+        </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 
     <profiles>
         <profile>
diff --git a/kylin-it/src/test/scala/kafka/admin/AdminUtils.scala b/kylin-it/src/test/scala/kafka/admin/AdminUtils.scala
new file mode 100644
index 0000000..1b019af
--- /dev/null
+++ b/kylin-it/src/test/scala/kafka/admin/AdminUtils.scala
@@ -0,0 +1,788 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.cluster.Broker
+import kafka.log.LogConfig
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
+import kafka.utils._
+import kafka.utils.ZkUtils._
+import java.util.Random
+import java.util.Properties
+
+import kafka.common.TopicAlreadyMarkedForDeletionException
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.{BrokerNotAvailableException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException, ReplicaNotAvailableException, LeaderNotAvailableException}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.MetadataResponse
+
+import collection.{Map, Set, mutable, _}
+import scala.collection.JavaConverters._
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.protocol.Errors
+
+import scala.collection.mutable.ListBuffer
+
+trait AdminUtilities {
+  def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
+  def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
+  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties)
+  def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties)
+
+  def changeConfigs(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties): Unit = {
+
+    def parseBroker(broker: String): Int = {
+      try broker.toInt
+      catch {
+        case _: NumberFormatException =>
+          throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
+      }
+    }
+
+    entityType match {
+      case ConfigType.Topic => changeTopicConfig(zkUtils, entityName, configs)
+      case ConfigType.Client => changeClientIdConfig(zkUtils, entityName, configs)
+      case ConfigType.User => changeUserOrUserClientIdConfig(zkUtils, entityName, configs)
+      case ConfigType.Broker => changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs)
+      case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
+    }
+  }
+
+  def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties
+}
+
+object AdminUtils extends Logging with AdminUtilities {
+  val rand = new Random
+  val AdminClientId = "__admin_client"
+  val EntityConfigChangeZnodePrefix = "config_change_"
+
+  /**
+   * There are 3 goals of replica assignment:
+   *
+   * 1. Spread the replicas evenly among brokers.
+   * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
+   * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
+   *
+   * To achieve this goal for replica assignment without considering racks, we:
+   * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
+   * 2. Assign the remaining replicas of each partition with an increasing shift.
+   *
+   * Here is an example of assigning
+   * broker-0  broker-1  broker-2  broker-3  broker-4
+   * p0        p1        p2        p3        p4       (1st replica)
+   * p5        p6        p7        p8        p9       (1st replica)
+   * p4        p0        p1        p2        p3       (2nd replica)
+   * p8        p9        p5        p6        p7       (2nd replica)
+   * p3        p4        p0        p1        p2       (3nd replica)
+   * p7        p8        p9        p5        p6       (3nd replica)
+   *
+   * To create rack aware assignment, this API will first create a rack alternated broker list. For example,
+   * from this brokerID -> rack mapping:
+   *
+   * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
+   *
+   * The rack alternated list will be:
+   *
+   * 0, 3, 1, 5, 4, 2
+   *
+   * Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment
+   * will be:
+   *
+   * 0 -> 0,3,1
+   * 1 -> 3,1,5
+   * 2 -> 1,5,4
+   * 3 -> 5,4,2
+   * 4 -> 4,2,0
+   * 5 -> 2,0,3
+   *
+   * Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start
+   * shifting the followers. This is to ensure we will not always get the same set of sequences.
+   * In this case, if there is another partition to assign (partition #6), the assignment will be:
+   *
+   * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
+   *
+   * The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated
+   * broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have
+   * any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on
+   * the broker list.
+   *
+   * As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that
+   * each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect
+   * situation where the number of replicas is the same as the number of racks and each rack has the same number of
+   * brokers, it guarantees that the replica distribution is even across brokers and racks.
+   *
+   * @return a Map from partition id to replica ids
+   * @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to
+   *                                 assign each replica to a unique rack.
+   *
+   */
+  def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
+                              nPartitions: Int,
+                              replicationFactor: Int,
+                              fixedStartIndex: Int = -1,
+                              startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
+    if (nPartitions <= 0)
+      throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
+    if (replicationFactor <= 0)
+      throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
+    if (replicationFactor > brokerMetadatas.size)
+      throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
+    if (brokerMetadatas.forall(_.rack.isEmpty))
+      assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
+        startPartitionId)
+    else {
+      if (brokerMetadatas.exists(_.rack.isEmpty))
+        throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
+      assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
+        startPartitionId)
+    }
+  }
+
+  private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
+                                                 replicationFactor: Int,
+                                                 brokerList: Seq[Int],
+                                                 fixedStartIndex: Int,
+                                                 startPartitionId: Int): Map[Int, Seq[Int]] = {
+    val ret = mutable.Map[Int, Seq[Int]]()
+    val brokerArray = brokerList.toArray
+    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
+    var currentPartitionId = math.max(0, startPartitionId)
+    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
+    for (_ <- 0 until nPartitions) {
+      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
+        nextReplicaShift += 1
+      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
+      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
+      for (j <- 0 until replicationFactor - 1)
+        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
+      ret.put(currentPartitionId, replicaBuffer)
+      currentPartitionId += 1
+    }
+    ret
+  }
+
+  private def assignReplicasToBrokersRackAware(nPartitions: Int,
+                                               replicationFactor: Int,
+                                               brokerMetadatas: Seq[BrokerMetadata],
+                                               fixedStartIndex: Int,
+                                               startPartitionId: Int): Map[Int, Seq[Int]] = {
+    val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
+      id -> rack
+    }.toMap
+    val numRacks = brokerRackMap.values.toSet.size
+    val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
+    val numBrokers = arrangedBrokerList.size
+    val ret = mutable.Map[Int, Seq[Int]]()
+    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
+    var currentPartitionId = math.max(0, startPartitionId)
+    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
+    for (_ <- 0 until nPartitions) {
+      if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
+        nextReplicaShift += 1
+      val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
+      val leader = arrangedBrokerList(firstReplicaIndex)
+      val replicaBuffer = mutable.ArrayBuffer(leader)
+      val racksWithReplicas = mutable.Set(brokerRackMap(leader))
+      val brokersWithReplicas = mutable.Set(leader)
+      var k = 0
+      for (_ <- 0 until replicationFactor - 1) {
+        var done = false
+        while (!done) {
+          val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
+          val rack = brokerRackMap(broker)
+          // Skip this broker if
+          // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
+          //    that do not have any replica, or
+          // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
+          if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
+              && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
+            replicaBuffer += broker
+            racksWithReplicas += rack
+            brokersWithReplicas += broker
+            done = true
+          }
+          k += 1
+        }
+      }
+      ret.put(currentPartitionId, replicaBuffer)
+      currentPartitionId += 1
+    }
+    ret
+  }
+
+  def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata =
+    fetchTopicMetadataFromZk(topic, zkUtils, mutable.Map.empty[Int, Broker],
+      ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+
+  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[MetadataResponse.TopicMetadata] =
+    fetchTopicMetadataFromZk(topics, zkUtils, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+
+  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, securityProtocol: SecurityProtocol): Set[MetadataResponse.TopicMetadata] =
+    fetchTopicMetadataFromZk(topics, zkUtils, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+
+  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, listenerName: ListenerName): Set[MetadataResponse.TopicMetadata] = {
+    val cachedBrokerInfo = mutable.Map.empty[Int, Broker]
+    topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo, listenerName))
+  }
+
+  private def fetchTopicMetadataFromZk(topic: String,
+                                       zkUtils: ZkUtils,
+                                       cachedBrokerInfo: mutable.Map[Int, Broker],
+                                       listenerName: ListenerName): MetadataResponse.TopicMetadata = {
+    if (zkUtils.pathExists(getTopicPath(topic))) {
+      val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic))(topic)
+      val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
+      val partitionMetadata = sortedPartitions.map { partitionMap =>
+        val partition = partitionMap._1
+        val replicas = partitionMap._2
+        val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partition)
+        val leader = zkUtils.getLeaderForPartition(topic, partition)
+        debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
+
+        var leaderInfo: Node = Node.noNode()
+        var replicaInfo: Seq[Node] = Nil
+        var isrInfo: Seq[Node] = Nil
+        try {
+          leaderInfo = leader match {
+            case Some(l) =>
+              try {
+                getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getNode(listenerName)
+              } catch {
+                case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
+              }
+            case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+          }
+          try {
+            replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getNode(listenerName))
+            isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getNode(listenerName))
+          } catch {
+            case e: Throwable => throw new ReplicaNotAvailableException(e)
+          }
+          if (replicaInfo.size < replicas.size)
+            throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
+              replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+          if (isrInfo.size < inSyncReplicas.size)
+            throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
+              inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+          new MetadataResponse.PartitionMetadata(Errors.NONE, partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava, null)
+        } catch {
+          case e: Throwable =>
+            debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
+            new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava, null)
+        }
+      }
+      new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.asJava)
+    } else {
+      // topic doesn't exist, send appropriate error code
+      new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic), java.util.Collections.emptyList())
+    }
+  }
+
+  private def getBrokerInfoFromCache(zkUtils: ZkUtils,
+                                     cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
+                                     brokerIds: Seq[Int]): Seq[Broker] = {
+    var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
+    val brokerMetadata = brokerIds.map { id =>
+      val optionalBrokerInfo = cachedBrokerInfo.get(id)
+      optionalBrokerInfo match {
+        case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
+        case None => // fetch it from zookeeper
+          zkUtils.getBrokerInfo(id) match {
+            case Some(brokerInfo) =>
+              cachedBrokerInfo += (id -> brokerInfo)
+              Some(brokerInfo)
+            case None =>
+              failedBrokerIds += id
+              None
+          }
+      }
+    }
+    brokerMetadata.filter(_.isDefined).map(_.get)
+  }
+
+  /**
+    * Given broker and rack information, returns a list of brokers alternated by the rack. Assume
+    * this is the rack and its brokers:
+    *
+    * rack1: 0, 1, 2
+    * rack2: 3, 4, 5
+    * rack3: 6, 7, 8
+    *
+    * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
+    *
+    * This is essential to make sure that the assignReplicasToBrokers API can use such list and
+    * assign replicas to brokers in a simple round-robin fashion, while ensuring an even
+    * distribution of leader and replica counts on each broker and that replicas are
+    * distributed to all racks.
+    */
+  private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = {
+    val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) =>
+      (rack, brokers.toIterator)
+    }
+    val racks = brokersIteratorByRack.keys.toArray.sorted
+    val result = new mutable.ArrayBuffer[Int]
+    var rackIndex = 0
+    while (result.size < brokerRackMap.size) {
+      val rackIterator = brokersIteratorByRack(racks(rackIndex))
+      if (rackIterator.hasNext)
+        result += rackIterator.next()
+      rackIndex = (rackIndex + 1) % racks.length
+    }
+    result
+  }
+
+  private[admin] def getInverseMap(brokerRackMap: Map[Int, String]): Map[String, Seq[Int]] = {
+    brokerRackMap.toSeq.map { case (id, rack) => (rack, id) }
+      .groupBy { case (rack, _) => rack }
+      .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) }
+  }
+ /**
+  * Add partitions to existing topic with optional replica assignment
+  *
+  * @param zkUtils Zookeeper utilities
+  * @param topic Topic for adding partitions to
+  * @param existingAssignment A map from partition id to its assigned replicas
+  * @param allBrokers All brokers in the cluster
+  * @param numPartitions Number of partitions to be set
+  * @param replicaAssignment Manual replica assignment, or none
+  * @param validateOnly If true, validate the parameters without actually adding the partitions
+  * @return the updated replica assignment
+  */
+  def addPartitions(zkUtils: ZkUtils,
+                    topic: String,
+                    existingAssignment: Map[Int, Seq[Int]],
+                    allBrokers: Seq[BrokerMetadata],
+                    numPartitions: Int = 1,
+                    replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
+                    validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
+    val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
+      throw new AdminOperationException(
+        s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
+          s"Assignment: $existingAssignment"))
+
+    val partitionsToAdd = numPartitions - existingAssignment.size
+    if (partitionsToAdd <= 0)
+      throw new InvalidPartitionsException(
+        s"The number of partitions for a topic can only be increased. " +
+          s"Topic $topic currently has ${existingAssignment.size} partitions, " +
+          s"$numPartitions would not be an increase.")
+
+    replicaAssignment.foreach { proposedReplicaAssignment =>
+      validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0,
+        allBrokers.map(_.id).toSet)
+    }
+
+    val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
+      val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
+      AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
+        startIndex, existingAssignment.size)
+    }
+    val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
+    if (!validateOnly) {
+      info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
+        s"$proposedAssignmentForNewPartitions.")
+      // add the combined new list
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, proposedAssignment, update = true)
+    }
+    proposedAssignment
+
+  }
+
+  /**
+    * Parse a replica assignment string of the form:
+    * {{{
+    * broker_id_for_part1_replica1:broker_id_for_part1_replica2,
+    * broker_id_for_part2_replica1:broker_id_for_part2_replica2,
+    * ...
+    * }}}
+    */
+  def parseReplicaAssignment(replicaAssignmentsString: String, startPartitionId: Int): Map[Int, Seq[Int]] = {
+    val assignmentStrings = replicaAssignmentsString.split(",")
+    val assignmentMap = mutable.Map[Int, Seq[Int]]()
+    var partitionId = startPartitionId
+    for (assignmentString <- assignmentStrings) {
+      val brokerIds = assignmentString.split(":").map(_.trim.toInt).toSeq
+      assignmentMap.put(partitionId, brokerIds)
+      partitionId = partitionId + 1
+    }
+    assignmentMap
+  }
+
+  private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
+                                        existingAssignmentPartition0: Seq[Int],
+                                        availableBrokerIds: Set[Int]): Unit = {
+
+    replicaAssignment.foreach { case (partitionId, replicas) =>
+      if (replicas.isEmpty)
+        throw new InvalidReplicaAssignmentException(
+          s"Cannot have replication factor of 0 for partition id $partitionId.")
+      if (replicas.size != replicas.toSet.size)
+        throw new InvalidReplicaAssignmentException(
+          s"Duplicate brokers not allowed in replica assignment: " +
+            s"${replicas.mkString(", ")} for partition id $partitionId.")
+      if (!replicas.toSet.subsetOf(availableBrokerIds))
+        throw new BrokerNotAvailableException(
+          s"Some brokers specified for partition id $partitionId are not available. " +
+            s"Specified brokers: ${replicas.mkString(", ")}, " +
+            s"available brokers: ${availableBrokerIds.mkString(", ")}.")
+      partitionId -> replicas.size
+    }
+    val badRepFactors = replicaAssignment.collect {
+      case (partition, replicas) if replicas.size != existingAssignmentPartition0.size => partition -> replicas.size
+    }
+    if (badRepFactors.nonEmpty) {
+      val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId }
+      val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId }
+      val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
+      throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
+        s"partition 0 has ${existingAssignmentPartition0.size} while partitions [${partitions.mkString(", ")}] have " +
+        s"replication factors [${repFactors.mkString(", ")}], respectively.")
+    }
+  }
+
+  def deleteTopic(zkUtils: ZkUtils, topic: String) {
+      if (topicExists(zkUtils, topic)) {
+        try {
+          zkUtils.createPersistentPath(getDeleteTopicPath(topic))
+        } catch {
+          case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+            "topic %s is already marked for deletion".format(topic))
+          case e2: Throwable => throw new AdminOperationException(e2)
+        }
+      } else {
+        throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
+      }
+    }
+
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
+  def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
+    zkUtils.getConsumersInGroup(group).nonEmpty
+  }
+
+  /**
+   * Delete the whole directory of the given consumer group if the group is inactive.
+   *
+   * @param zkUtils Zookeeper utilities
+   * @param group Consumer group
+   * @return whether or not we deleted the consumer group information
+   */
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
+  def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
+    if (!isConsumerGroupActive(zkUtils, group)) {
+      val dir = new ZKGroupDirs(group)
+      zkUtils.deletePathRecursive(dir.consumerGroupDir)
+      true
+    }
+    else false
+  }
+
+  /**
+   * Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
+   * If the consumer group consumes no other topics, delete the whole consumer group directory.
+   *
+   * @param zkUtils Zookeeper utilities
+   * @param group Consumer group
+   * @param topic Topic of the consumer group information we wish to delete
+   * @return whether or not we deleted the consumer group information for the given topic
+   */
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
+  def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
+    val topics = zkUtils.getTopicsByConsumerGroup(group)
+    if (topics == Seq(topic)) {
+      deleteConsumerGroupInZK(zkUtils, group)
+    }
+    else if (!isConsumerGroupActive(zkUtils, group)) {
+      val dir = new ZKGroupTopicDirs(group, topic)
+      zkUtils.deletePathRecursive(dir.consumerOwnerDir)
+      zkUtils.deletePathRecursive(dir.consumerOffsetDir)
+      true
+    }
+    else false
+  }
+
+  /**
+   * Delete every inactive consumer group's information about the given topic in Zookeeper.
+   *
+   * @param zkUtils Zookeeper utilities
+   * @param topic Topic of the consumer group information we wish to delete
+   */
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
+  def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
+    val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
+    groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
+  }
+
+  def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
+    zkUtils.pathExists(getTopicPath(topic))
+
+  def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
+                         brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
+    val allBrokers = zkUtils.getAllBrokersInCluster()
+    val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
+    val brokersWithRack = brokers.filter(_.rack.nonEmpty)
+    if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
+      throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
+        " to make replica assignment without rack information.")
+    }
+    val brokerMetadatas = rackAwareMode match {
+      case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
+      case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
+        brokers.map(broker => BrokerMetadata(broker.id, None))
+      case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+    }
+    brokerMetadatas.sortBy(_.id)
+  }
+
+  def createTopic(zkUtils: ZkUtils,
+                  topic: String,
+                  partitions: Int,
+                  replicationFactor: Int,
+                  topicConfig: Properties = new Properties,
+                  rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
+    val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
+    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
+  }
+
+  def validateCreateOrUpdateTopic(zkUtils: ZkUtils,
+                                  topic: String,
+                                  partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                  config: Properties,
+                                  update: Boolean): Unit = {
+    // validate arguments
+    Topic.validate(topic)
+
+    if (!update) {
+      if (topicExists(zkUtils, topic))
+        throw new TopicExistsException(s"Topic '$topic' already exists.")
+      else if (Topic.hasCollisionChars(topic)) {
+        val allTopics = zkUtils.getAllTopics()
+        // check again in case the topic was created in the meantime, otherwise the
+        // topic could potentially collide with itself
+        if (allTopics.contains(topic))
+          throw new TopicExistsException(s"Topic '$topic' already exists.")
+        val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _))
+        if (collidingTopics.nonEmpty) {
+          throw new InvalidTopicException(s"Topic '$topic' collides with existing topics: ${collidingTopics.mkString(", ")}")
+        }
+      }
+    }
+
+    if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
+      throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
+
+    partitionReplicaAssignment.values.foreach(reps =>
+      if (reps.size != reps.toSet.size)
+        throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
+    )
+
+
+    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+    if (!update)
+      LogConfig.validate(config)
+  }
+
+  def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
+                                                     topic: String,
+                                                     partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                                     config: Properties = new Properties,
+                                                     update: Boolean = false) {
+    validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)
+
+    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+    if (!update) {
+      // write out the config if there is any, this isn't transactional with the partition assignments
+      writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
+    }
+
+    // create the partition assignment
+    writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
+  }
+
+  private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
+    try {
+      val zkPath = getTopicPath(topic)
+      val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))
+
+      if (!update) {
+        info("Topic creation " + jsonPartitionData.toString)
+        zkUtils.createPersistentPath(zkPath, jsonPartitionData)
+      } else {
+        info("Topic update " + jsonPartitionData.toString)
+        zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
+      }
+      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
+    } catch {
+      case _: ZkNodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
+      case e2: Throwable => throw new AdminOperationException(e2.toString)
+    }
+  }
+
+  /**
+   * Update the config for a client and create a change notification so the change will propagate to other brokers.
+   * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
+   * and <user> configs are not specified.
+   *
+   * @param zkUtils Zookeeper utilities used to write the config to ZK
+   * @param sanitizedClientId: The sanitized clientId for which configs are being changed
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) {
+    DynamicConfig.Client.validate(configs)
+    changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
+  }
+
+  /**
+   * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
+   * User and/or clientId components of the path may be <default>, indicating that the configuration is the default
+   * value to be applied if a more specific override is not configured.
+   *
+   * @param zkUtils Zookeeper utilities used to write the config to ZK
+   * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
+    if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
+      DynamicConfig.Client.validate(configs)
+    else
+      DynamicConfig.User.validate(configs)
+    changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
+  }
+
+  def validateTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {
+    Topic.validate(topic)
+    if (!topicExists(zkUtils, topic))
+      throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
+    // remove the topic overrides
+    LogConfig.validate(configs)
+  }
+
+  /**
+   * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+   *
+   * @param zkUtils Zookeeper utilities used to write the config to ZK
+   * @param topic: The topic for which configs are being changed
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
+    validateTopicConfig(zkUtils, topic, configs)
+    changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
+  }
+
+  /**
+    * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will
+    * override any defaults entered in the broker's config files
+    *
+    * @param zkUtils: Zookeeper utilities used to write the config to ZK
+    * @param brokers: The list of brokers to apply config changes to
+    * @param configs: The config to change, as properties
+    */
+  def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = {
+    DynamicConfig.Broker.validate(configs)
+    brokers.foreach { broker =>
+      changeEntityConfig(zkUtils, ConfigType.Broker, broker.toString, configs)
+    }
+  }
+
+  private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
+    zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
+
+    val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
+    val entityConfigPath = getEntityConfigPath(rootEntityType, fullSanitizedEntityName)
+    // write the new config--may not exist if there were previously no overrides
+    writeEntityConfig(zkUtils, entityConfigPath, configs)
+
+    // create the change notification
+    val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
+    val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath))
+    zkUtils.createSequentialPersistentPath(seqNode, content)
+  }
+
+  def getConfigChangeZnodeData(sanitizedEntityPath: String) : Map[String, Any] = {
+    Map("version" -> 2, "entity_path" -> sanitizedEntityPath)
+  }
+
+  /**
+   * Write out the entity config to zk, if there is any
+   */
+  private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
+    val map = Map("version" -> 1, "config" -> config.asScala)
+    zkUtils.updatePersistentPath(entityPath, Json.encode(map))
+  }
+
+  /**
+   * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
+   * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
+   */
+  def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = {
+    val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName)
+    // readDataMaybeNull returns Some(null) if the path exists, but there is no data
+    val str = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull
+    val props = new Properties()
+    if (str != null) {
+      Json.parseFull(str).foreach { jsValue =>
+        val jsObject = jsValue.asJsonObjectOption.getOrElse {
+          throw new IllegalArgumentException(s"Unexpected value in config: $str, entity_config_path: $entityConfigPath")
+        }
+        require(jsObject("version").to[Int] == 1)
+        val config = jsObject.get("config").flatMap(_.asJsonObjectOption).getOrElse {
+          throw new IllegalArgumentException(s"Invalid $entityConfigPath config: $str")
+        }
+        config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) }
+      }
+    }
+    props
+  }
+
+  def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
+    zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
+
+  def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
+    zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
+
+  def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
+    def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
+      val root = rootPath match {
+        case Some(path) => rootEntityType + '/' + path
+        case None => rootEntityType
+      }
+      val entityNames = zkUtils.getAllEntitiesWithConfig(root)
+      rootPath match {
+        case Some(path) => entityNames.map(entityName => path + '/' + entityName)
+        case None => entityNames
+      }
+    }
+    entityPaths(zkUtils, None)
+      .flatMap(entity => entityPaths(zkUtils, Some(entity + '/' + childEntityType)))
+      .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, rootEntityType, entityPath))).toMap
+  }
+
+  private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
+    val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
+    (firstReplicaIndex + shift) % nBrokers
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index 854fd3b..e1ab370 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
         <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
 
         <!-- Kafka versions -->
-        <kafka.version>0.11.0.1</kafka.version>
+        <kafka.version>1.0.0</kafka.version>
 
         <!-- Spark versions -->
         <spark.version>2.1.2</spark.version>