You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/02/17 03:25:22 UTC

kafka git commit: KAFKA-2508; Replace UpdateMetadata{Request,Response} with o.a.k.c.req…

Repository: kafka
Updated Branches:
  refs/heads/trunk 3ee1878d8 -> 5df56145a


KAFKA-2508; Replace UpdateMetadata{Request,Response} with o.a.k.c.req…

…uests equivalent

Author: Grant Henke <gr...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Sriharsha Chintalapani <ma...@harsha.io>

Closes #896 from granthenke/update-metadata and squashes the following commits:

2eb5d59 [Grant Henke] Address reviews
497258d [Grant Henke] KAFKA-2508: Replace UpdateMetadata{Request,Response} with o.a.k.c.requests equivalent


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5df56145
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5df56145
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5df56145

Branch: refs/heads/trunk
Commit: 5df56145a51c78e8808046254ad4771d60957ac5
Parents: 3ee1878
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Feb 16 18:25:00 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Feb 16 18:25:00 2016 -0800

----------------------------------------------------------------------
 .../scala/kafka/api/UpdateMetadataRequest.scala | 147 -------------------
 .../kafka/api/UpdateMetadataResponse.scala      |  42 ------
 .../scala/kafka/network/RequestChannel.scala    |   1 -
 .../src/main/scala/kafka/server/KafkaApis.scala |  19 ++-
 .../main/scala/kafka/server/MetadataCache.scala |  34 ++++-
 .../scala/kafka/server/ReplicaManager.scala     |   9 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  87 ++++++-----
 .../api/RequestResponseSerializationTest.scala  |  23 +--
 .../integration/BaseTopicMetadataTest.scala     |  20 +--
 9 files changed, 95 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
deleted file mode 100644
index f761125..0000000
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.api.ApiUtils._
-import kafka.cluster.{Broker, BrokerEndPoint}
-import kafka.common.{KafkaException, TopicAndPartition}
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-
-import scala.collection.Set
-
-object UpdateMetadataRequest {
-  val CurrentVersion = 1.shortValue
-  val IsInit: Boolean = true
-  val NotInit: Boolean = false
-  val DefaultAckTimeout: Int = 1000
-
-  def readFrom(buffer: ByteBuffer): UpdateMetadataRequest = {
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
-    val controllerId = buffer.getInt
-    val controllerEpoch = buffer.getInt
-    val partitionStateInfosCount = buffer.getInt
-    val partitionStateInfos = new collection.mutable.HashMap[TopicAndPartition, PartitionStateInfo]
-
-    for(i <- 0 until partitionStateInfosCount){
-      val topic = readShortString(buffer)
-      val partition = buffer.getInt
-      val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
-
-      partitionStateInfos.put(TopicAndPartition(topic, partition), partitionStateInfo)
-    }
-
-    val numAliveBrokers = buffer.getInt
-
-    val aliveBrokers = versionId match {
-      case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndPoint.readFrom(buffer),SecurityProtocol.PLAINTEXT)
-      case 1 => for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer)
-      case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
-    }
-
-    new UpdateMetadataRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
-      partitionStateInfos.toMap, aliveBrokers.toSet)
-  }
-}
-
-case class UpdateMetadataRequest (versionId: Short,
-                                  correlationId: Int,
-                                  clientId: String,
-                                  controllerId: Int,
-                                  controllerEpoch: Int,
-                                  partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo],
-                                  aliveBrokers: Set[Broker])
-  extends RequestOrResponse(Some(ApiKeys.UPDATE_METADATA_KEY.id)) {
-
-  def this(controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String,
-           partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Broker]) = {
-    this(UpdateMetadataRequest.CurrentVersion, correlationId, clientId,
-      controllerId, controllerEpoch, partitionStateInfos, aliveBrokers)
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    buffer.putInt(correlationId)
-    writeShortString(buffer, clientId)
-    buffer.putInt(controllerId)
-    buffer.putInt(controllerEpoch)
-    buffer.putInt(partitionStateInfos.size)
-    for((key, value) <- partitionStateInfos){
-      writeShortString(buffer, key.topic)
-      buffer.putInt(key.partition)
-      value.writeTo(buffer)
-    }
-    buffer.putInt(aliveBrokers.size)
-
-    versionId match {
-      case 0 => aliveBrokers.foreach(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).writeTo(buffer))
-      case 1 => aliveBrokers.foreach(_.writeTo(buffer))
-      case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
-    }
-  }
-
-  def sizeInBytes(): Int = {
-    var size =
-      2 /* version id */ +
-        4 /* correlation id */ +
-        (2 + clientId.length) /* client id */ +
-        4 /* controller id */ +
-        4 /* controller epoch */ +
-        4 /* number of partitions */
-    for((key, value) <- partitionStateInfos)
-      size += (2 + key.topic.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
-    size += 4 /* number of alive brokers in the cluster */
-
-    versionId match  {
-      case 0 => for(broker <- aliveBrokers)
-        size += broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).sizeInBytes /* broker info */
-      case 1 => for(broker  <- aliveBrokers)
-        size += broker.sizeInBytes
-      case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
-    }
-
-    size
-  }
-
-  override def toString(): String = {
-    describe(true)
-  }
-
-  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponse = new UpdateMetadataResponse(correlationId, Errors.forException(e).code)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
-  override def describe(details: Boolean): String = {
-    val updateMetadataRequest = new StringBuilder
-    updateMetadataRequest.append("Name:" + this.getClass.getSimpleName)
-    updateMetadataRequest.append(";Version:" + versionId)
-    updateMetadataRequest.append(";Controller:" + controllerId)
-    updateMetadataRequest.append(";ControllerEpoch:" + controllerEpoch)
-    updateMetadataRequest.append(";CorrelationId:" + correlationId)
-    updateMetadataRequest.append(";ClientId:" + clientId)
-    updateMetadataRequest.append(";AliveBrokers:" + aliveBrokers.mkString(","))
-    if(details)
-      updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
-    updateMetadataRequest.toString()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
deleted file mode 100644
index 3bdb3ca..0000000
--- a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import org.apache.kafka.common.protocol.Errors
-
-object UpdateMetadataResponse {
-  def readFrom(buffer: ByteBuffer): UpdateMetadataResponse = {
-    val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
-    new UpdateMetadataResponse(correlationId, errorCode)
-  }
-}
-
-case class UpdateMetadataResponse(correlationId: Int,
-                                  errorCode: Short = Errors.NONE.code)
-  extends RequestOrResponse() {
-  def sizeInBytes(): Int = 4 /* correlation id */ + 2 /* error code */
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(correlationId)
-    buffer.putShort(errorCode)
-  }
-
-  override def describe(details: Boolean):String = { toString }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 47f7a34..2e63b53 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -64,7 +64,6 @@ object RequestChannel extends Logging {
     private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
       Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
         ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
-        ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom,
         ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,
         ApiKeys.OFFSET_COMMIT.id -> OffsetCommitRequest.readFrom,
         ApiKeys.OFFSET_FETCH.id -> OffsetFetchRequest.readFrom

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e48df90..f2e9533 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.common.protocol.{ProtoUtils, ApiKeys, Errors, SecurityPr
 import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
 DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
 LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
-StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse}
+StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Node}
@@ -175,14 +175,19 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {
-    val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
-
-    authorizeClusterAction(request)
+    val correlationId = request.header.correlationId
+    val updateMetadataRequest = request.body.asInstanceOf[UpdateMetadataRequest]
 
-    replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)
+    val updateMetadataResponse =
+      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+        replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
+        new UpdateMetadataResponse(Errors.NONE.code)
+      } else {
+        new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
+      }
 
-    val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, updateMetadataResponse)))
+    val responseHeader = new ResponseHeader(correlationId)
+    requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, updateMetadataResponse)))
   }
 
   def handleControlledShutdownRequest(request: RequestChannel.Request) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index f47a6aa..4be795d 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,19 +17,24 @@
 
 package kafka.server
 
-import kafka.cluster.{BrokerEndPoint,Broker}
+import kafka.cluster.{EndPoint, BrokerEndPoint, Broker}
 import kafka.common.TopicAndPartition
 
 import kafka.api._
 import kafka.controller.KafkaController.StateChangeLogger
+import kafka.controller.LeaderIsrAndControllerEpoch
 import org.apache.kafka.common.errors.{ReplicaNotAvailableException, LeaderNotAvailableException}
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.UpdateMetadataRequest
+import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
 import scala.collection.{Seq, Set, mutable}
+import scala.collection.JavaConverters._
 import kafka.utils.Logging
 import kafka.utils.CoreUtils._
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
@@ -120,29 +125,42 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  def updateCache(updateMetadataRequest: UpdateMetadataRequest,
+  def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest,
                   brokerId: Int,
                   stateChangeLogger: StateChangeLogger) {
     inWriteLock(partitionMetadataLock) {
-      aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
-      updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
-        if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
+      aliveBrokers = updateMetadataRequest.liveBrokers.asScala.map { broker =>
+        val endPoints = broker.endPoints.asScala.map { case (protocol, ep) =>
+          (protocol, EndPoint(ep.host, ep.port, protocol))
+        }.toMap
+        (broker.id, Broker(broker.id, endPoints))
+      }.toMap
+
+      updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
+        if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
           removePartitionInfo(tp.topic, tp.partition)
           stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
             "sent by controller %d epoch %d with correlation id %d")
             .format(brokerId, tp, updateMetadataRequest.controllerId,
-            updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+              updateMetadataRequest.controllerEpoch, correlationId))
         } else {
-          addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
+          val partitionInfo = partitionStateToPartitionStateInfo(info)
+          addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
           stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
             "sent by controller %d epoch %d with correlation id %d")
             .format(brokerId, info, tp, updateMetadataRequest.controllerId,
-            updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+              updateMetadataRequest.controllerEpoch, correlationId))
         }
       }
     }
   }
 
+  private def partitionStateToPartitionStateInfo(partitionState: PartitionState): PartitionStateInfo = {
+    val leaderAndIsr = LeaderAndIsr(partitionState.leader, partitionState.leaderEpoch, partitionState.isr.asScala.map(_.toInt).toList, partitionState.zkVersion)
+    val leaderInfo = LeaderIsrAndControllerEpoch(leaderAndIsr, partitionState.controllerEpoch)
+    PartitionStateInfo(leaderInfo, partitionState.replicas.asScala.map(_.toInt))
+  }
+
   def contains(topic: String): Boolean = {
     inReadLock(partitionMetadataLock) {
       cache.contains(topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0ffb0e3..61b6887 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -34,8 +34,7 @@ InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException,
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
-import org.apache.kafka.common.requests.StopReplicaRequest
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time => JTime}
 
@@ -569,17 +568,17 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
+  def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
     replicaStateChangeLock synchronized {
       if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
         val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
           "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
-          updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
+          correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
           controllerEpoch)
         stateChangeLogger.warn(stateControllerEpochErrorMessage)
         throw new ControllerMovedException(stateControllerEpochErrorMessage)
       } else {
-        metadataCache.updateCache(updateMetadataRequest, localBrokerId, stateChangeLogger)
+        metadataCache.updateCache(correlationId, updateMetadataRequest, localBrokerId, stateChangeLogger)
         controllerEpoch = updateMetadataRequest.controllerEpoch
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 0fce611..b9bbace 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.admin
 
+import kafka.api.TopicMetadata
 import org.junit.Assert._
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.zk.ZooKeeperTestHarness
@@ -138,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testReplicaPlacement {
+  def testReplicaPlacementAllServers {
     AdminUtils.addPartitions(zkUtils, topic3, 7)
 
     // read metadata from a broker and verify the new topic partitions exist
@@ -149,52 +150,46 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
 
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacement",
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacementAllServers",
       2000,0).topicsMetadata
 
-    val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head
-    val partitionsMetadataForTopic3 = metaDataForTopic3.partitionsMetadata.sortBy(_.partitionId)
-    val partition1DataForTopic3 = partitionsMetadataForTopic3(1)
-    val partition2DataForTopic3 = partitionsMetadataForTopic3(2)
-    val partition3DataForTopic3 = partitionsMetadataForTopic3(3)
-    val partition4DataForTopic3 = partitionsMetadataForTopic3(4)
-    val partition5DataForTopic3 = partitionsMetadataForTopic3(5)
-    val partition6DataForTopic3 = partitionsMetadataForTopic3(6)
-
-    assertEquals(partition1DataForTopic3.replicas.size, 4)
-    assertEquals(partition1DataForTopic3.replicas(0).id, 3)
-    assertEquals(partition1DataForTopic3.replicas(1).id, 2)
-    assertEquals(partition1DataForTopic3.replicas(2).id, 0)
-    assertEquals(partition1DataForTopic3.replicas(3).id, 1)
-
-    assertEquals(partition2DataForTopic3.replicas.size, 4)
-    assertEquals(partition2DataForTopic3.replicas(0).id, 0)
-    assertEquals(partition2DataForTopic3.replicas(1).id, 3)
-    assertEquals(partition2DataForTopic3.replicas(2).id, 1)
-    assertEquals(partition2DataForTopic3.replicas(3).id, 2)
-
-    assertEquals(partition3DataForTopic3.replicas.size, 4)
-    assertEquals(partition3DataForTopic3.replicas(0).id, 1)
-    assertEquals(partition3DataForTopic3.replicas(1).id, 0)
-    assertEquals(partition3DataForTopic3.replicas(2).id, 2)
-    assertEquals(partition3DataForTopic3.replicas(3).id, 3)
-
-    assertEquals(partition4DataForTopic3.replicas.size, 4)
-    assertEquals(partition4DataForTopic3.replicas(0).id, 2)
-    assertEquals(partition4DataForTopic3.replicas(1).id, 3)
-    assertEquals(partition4DataForTopic3.replicas(2).id, 0)
-    assertEquals(partition4DataForTopic3.replicas(3).id, 1)
-
-    assertEquals(partition5DataForTopic3.replicas.size, 4)
-    assertEquals(partition5DataForTopic3.replicas(0).id, 3)
-    assertEquals(partition5DataForTopic3.replicas(1).id, 0)
-    assertEquals(partition5DataForTopic3.replicas(2).id, 1)
-    assertEquals(partition5DataForTopic3.replicas(3).id, 2)
-
-    assertEquals(partition6DataForTopic3.replicas.size, 4)
-    assertEquals(partition6DataForTopic3.replicas(0).id, 0)
-    assertEquals(partition6DataForTopic3.replicas(1).id, 1)
-    assertEquals(partition6DataForTopic3.replicas(2).id, 2)
-    assertEquals(partition6DataForTopic3.replicas(3).id, 3)
+    val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get
+
+    validateLeaderAndReplicas(metaDataForTopic3, 0, 2, Set(2, 3, 0, 1))
+    validateLeaderAndReplicas(metaDataForTopic3, 1, 3, Set(3, 2, 0, 1))
+    validateLeaderAndReplicas(metaDataForTopic3, 2, 0, Set(0, 3, 1, 2))
+    validateLeaderAndReplicas(metaDataForTopic3, 3, 1, Set(1, 0, 2, 3))
+    validateLeaderAndReplicas(metaDataForTopic3, 4, 2, Set(2, 3, 0, 1))
+    validateLeaderAndReplicas(metaDataForTopic3, 5, 3, Set(3, 0, 1, 2))
+    validateLeaderAndReplicas(metaDataForTopic3, 6, 0, Set(0, 1, 2, 3))
+  }
+
+  @Test
+  def testReplicaPlacementPartialServers {
+    AdminUtils.addPartitions(zkUtils, topic2, 3)
+
+    // read metadata from a broker and verify the new topic partitions exist
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
+
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacementPartialServers",
+      2000,0).topicsMetadata
+
+    val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get
+
+    validateLeaderAndReplicas(metaDataForTopic2, 0, 1, Set(1, 2))
+    validateLeaderAndReplicas(metaDataForTopic2, 1, 2, Set(0, 2))
+    validateLeaderAndReplicas(metaDataForTopic2, 2, 3, Set(1, 3))
+  }
+
+  def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int, expectedReplicas: Set[Int]) = {
+    val partitionOpt = metadata.partitionsMetadata.find(_.partitionId == partitionId)
+    assertTrue(s"Partition $partitionId should exist", partitionOpt.isDefined)
+    val partition = partitionOpt.get
+
+    assertTrue("Partition leader should exist", partition.leader.isDefined)
+    assertEquals("Partition leader id should match", expectedLeaderId, partition.leader.get.id)
+
+    assertEquals("Replica set should match", expectedReplicas, partition.replicas.map(_.id).toSet)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cfbca00..e4b8854 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import kafka.cluster.{BrokerEndPoint, EndPoint, Broker}
+import kafka.cluster.{EndPoint, Broker}
 import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError}
 import kafka.common._
 import kafka.message.{Message, ByteBufferMessageSet}
@@ -209,22 +209,6 @@ object SerializationTestUtils {
   def createConsumerMetadataResponse: GroupCoordinatorResponse = {
     GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), Errors.NONE.code, 0)
   }
-
-  def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
-    UpdateMetadataRequest(
-      versionId,
-      correlationId = 0,
-      clientId = "client1",
-      controllerId = 0,
-      controllerEpoch = 0,
-      partitionStateInfos = updateMetadataRequestPartitionStateInfo,
-      brokers.toSet
-    )
-  }
-
-  def createUpdateMetadataResponse: UpdateMetadataResponse = {
-    UpdateMetadataResponse( correlationId = 0, errorCode = 0)
-  }
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
@@ -244,10 +228,6 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
   private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
-  private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0)
-  private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1)
-  private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse
-
 
   @Test
   def testSerializationAndDeserialization() {
@@ -259,7 +239,6 @@ class RequestResponseSerializationTest extends JUnitSuite {
                                offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2,
                                offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
                                consumerMetadataRequest, consumerMetadataResponse,
-                               updateMetadataRequestV0, updateMetadataRequestV1, updateMetdataResponse,
                                consumerMetadataResponseNoCoordinator)
 
     requestsAndResponses.foreach { original =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 1ef26f6..2400cfb 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -253,17 +253,19 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
 
     // Assert that topic metadata at new brokers is updated correctly
     servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
-      waitUntilTrue(() =>
-        topicMetadata == ClientUtils.fetchTopicMetadata(
-          Set.empty,
-          Seq(new Broker(x.config.brokerId,
-            x.config.hostName,
-            x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
-          "TopicMetadataTest-testBasicTopicMetadata",
-          2000, 0), "Topic metadata is not correctly updated"))
+      waitUntilTrue(() => {
+          val foundMetadata = ClientUtils.fetchTopicMetadata(
+            Set.empty,
+            Seq(new Broker(x.config.brokerId,
+              x.config.hostName,
+              x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
+            "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
+          topicMetadata.brokers.sortBy(_.id) == foundMetadata.brokers.sortBy(_.id) &&
+            topicMetadata.topicsMetadata.sortBy(_.topic) == foundMetadata.topicsMetadata.sortBy(_.topic)
+        },
+        s"Topic metadata is not correctly updated"))
   }
 
-
   @Test
   def testAliveBrokerListWithNoTopics {
     checkMetadata(Seq(server1), 1)