You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/02/17 03:25:22 UTC
kafka git commit: KAFKA-2508; Replace UpdateMetadata{Request,Response} with o.a.k.c.req…
Repository: kafka
Updated Branches:
refs/heads/trunk 3ee1878d8 -> 5df56145a
KAFKA-2508; Replace UpdateMetadata{Request,Response} with o.a.k.c.req…
…uests equivalent
Author: Grant Henke <gr...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Sriharsha Chintalapani <ma...@harsha.io>
Closes #896 from granthenke/update-metadata and squashes the following commits:
2eb5d59 [Grant Henke] Address reviews
497258d [Grant Henke] KAFKA-2508: Replace UpdateMetadata{Request,Response} with o.a.k.c.requests equivalent
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5df56145
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5df56145
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5df56145
Branch: refs/heads/trunk
Commit: 5df56145a51c78e8808046254ad4771d60957ac5
Parents: 3ee1878
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Feb 16 18:25:00 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Feb 16 18:25:00 2016 -0800
----------------------------------------------------------------------
.../scala/kafka/api/UpdateMetadataRequest.scala | 147 -------------------
.../kafka/api/UpdateMetadataResponse.scala | 42 ------
.../scala/kafka/network/RequestChannel.scala | 1 -
.../src/main/scala/kafka/server/KafkaApis.scala | 19 ++-
.../main/scala/kafka/server/MetadataCache.scala | 34 ++++-
.../scala/kafka/server/ReplicaManager.scala | 9 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 87 ++++++-----
.../api/RequestResponseSerializationTest.scala | 23 +--
.../integration/BaseTopicMetadataTest.scala | 20 +--
9 files changed, 95 insertions(+), 287 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
deleted file mode 100644
index f761125..0000000
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.api.ApiUtils._
-import kafka.cluster.{Broker, BrokerEndPoint}
-import kafka.common.{KafkaException, TopicAndPartition}
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-
-import scala.collection.Set
-
-object UpdateMetadataRequest {
- val CurrentVersion = 1.shortValue
- val IsInit: Boolean = true
- val NotInit: Boolean = false
- val DefaultAckTimeout: Int = 1000
-
- def readFrom(buffer: ByteBuffer): UpdateMetadataRequest = {
- val versionId = buffer.getShort
- val correlationId = buffer.getInt
- val clientId = readShortString(buffer)
- val controllerId = buffer.getInt
- val controllerEpoch = buffer.getInt
- val partitionStateInfosCount = buffer.getInt
- val partitionStateInfos = new collection.mutable.HashMap[TopicAndPartition, PartitionStateInfo]
-
- for(i <- 0 until partitionStateInfosCount){
- val topic = readShortString(buffer)
- val partition = buffer.getInt
- val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
-
- partitionStateInfos.put(TopicAndPartition(topic, partition), partitionStateInfo)
- }
-
- val numAliveBrokers = buffer.getInt
-
- val aliveBrokers = versionId match {
- case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndPoint.readFrom(buffer),SecurityProtocol.PLAINTEXT)
- case 1 => for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer)
- case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
- }
-
- new UpdateMetadataRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
- partitionStateInfos.toMap, aliveBrokers.toSet)
- }
-}
-
-case class UpdateMetadataRequest (versionId: Short,
- correlationId: Int,
- clientId: String,
- controllerId: Int,
- controllerEpoch: Int,
- partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo],
- aliveBrokers: Set[Broker])
- extends RequestOrResponse(Some(ApiKeys.UPDATE_METADATA_KEY.id)) {
-
- def this(controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String,
- partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Broker]) = {
- this(UpdateMetadataRequest.CurrentVersion, correlationId, clientId,
- controllerId, controllerEpoch, partitionStateInfos, aliveBrokers)
- }
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putShort(versionId)
- buffer.putInt(correlationId)
- writeShortString(buffer, clientId)
- buffer.putInt(controllerId)
- buffer.putInt(controllerEpoch)
- buffer.putInt(partitionStateInfos.size)
- for((key, value) <- partitionStateInfos){
- writeShortString(buffer, key.topic)
- buffer.putInt(key.partition)
- value.writeTo(buffer)
- }
- buffer.putInt(aliveBrokers.size)
-
- versionId match {
- case 0 => aliveBrokers.foreach(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).writeTo(buffer))
- case 1 => aliveBrokers.foreach(_.writeTo(buffer))
- case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
- }
- }
-
- def sizeInBytes(): Int = {
- var size =
- 2 /* version id */ +
- 4 /* correlation id */ +
- (2 + clientId.length) /* client id */ +
- 4 /* controller id */ +
- 4 /* controller epoch */ +
- 4 /* number of partitions */
- for((key, value) <- partitionStateInfos)
- size += (2 + key.topic.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
- size += 4 /* number of alive brokers in the cluster */
-
- versionId match {
- case 0 => for(broker <- aliveBrokers)
- size += broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).sizeInBytes /* broker info */
- case 1 => for(broker <- aliveBrokers)
- size += broker.sizeInBytes
- case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
- }
-
- size
- }
-
- override def toString(): String = {
- describe(true)
- }
-
- override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val errorResponse = new UpdateMetadataResponse(correlationId, Errors.forException(e).code)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
- }
-
- override def describe(details: Boolean): String = {
- val updateMetadataRequest = new StringBuilder
- updateMetadataRequest.append("Name:" + this.getClass.getSimpleName)
- updateMetadataRequest.append(";Version:" + versionId)
- updateMetadataRequest.append(";Controller:" + controllerId)
- updateMetadataRequest.append(";ControllerEpoch:" + controllerEpoch)
- updateMetadataRequest.append(";CorrelationId:" + correlationId)
- updateMetadataRequest.append(";ClientId:" + clientId)
- updateMetadataRequest.append(";AliveBrokers:" + aliveBrokers.mkString(","))
- if(details)
- updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
- updateMetadataRequest.toString()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
deleted file mode 100644
index 3bdb3ca..0000000
--- a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import org.apache.kafka.common.protocol.Errors
-
-object UpdateMetadataResponse {
- def readFrom(buffer: ByteBuffer): UpdateMetadataResponse = {
- val correlationId = buffer.getInt
- val errorCode = buffer.getShort
- new UpdateMetadataResponse(correlationId, errorCode)
- }
-}
-
-case class UpdateMetadataResponse(correlationId: Int,
- errorCode: Short = Errors.NONE.code)
- extends RequestOrResponse() {
- def sizeInBytes(): Int = 4 /* correlation id */ + 2 /* error code */
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(correlationId)
- buffer.putShort(errorCode)
- }
-
- override def describe(details: Boolean):String = { toString }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 47f7a34..2e63b53 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -64,7 +64,6 @@ object RequestChannel extends Logging {
private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
- ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom,
ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,
ApiKeys.OFFSET_COMMIT.id -> OffsetCommitRequest.readFrom,
ApiKeys.OFFSET_FETCH.id -> OffsetFetchRequest.readFrom
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e48df90..f2e9533 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.common.protocol.{ProtoUtils, ApiKeys, Errors, SecurityPr
import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
-StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse}
+StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Node}
@@ -175,14 +175,19 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleUpdateMetadataRequest(request: RequestChannel.Request) {
- val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
-
- authorizeClusterAction(request)
+ val correlationId = request.header.correlationId
+ val updateMetadataRequest = request.body.asInstanceOf[UpdateMetadataRequest]
- replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)
+ val updateMetadataResponse =
+ if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
+ new UpdateMetadataResponse(Errors.NONE.code)
+ } else {
+ new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
+ }
- val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, updateMetadataResponse)))
+ val responseHeader = new ResponseHeader(correlationId)
+ requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, updateMetadataResponse)))
}
def handleControlledShutdownRequest(request: RequestChannel.Request) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index f47a6aa..4be795d 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,19 +17,24 @@
package kafka.server
-import kafka.cluster.{BrokerEndPoint,Broker}
+import kafka.cluster.{EndPoint, BrokerEndPoint, Broker}
import kafka.common.TopicAndPartition
import kafka.api._
import kafka.controller.KafkaController.StateChangeLogger
+import kafka.controller.LeaderIsrAndControllerEpoch
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, LeaderNotAvailableException}
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.UpdateMetadataRequest
+import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
import scala.collection.{Seq, Set, mutable}
+import scala.collection.JavaConverters._
import kafka.utils.Logging
import kafka.utils.CoreUtils._
import java.util.concurrent.locks.ReentrantReadWriteLock
+
/**
* A cache for the state (e.g., current leader) of each partition. This cache is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
@@ -120,29 +125,42 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}
- def updateCache(updateMetadataRequest: UpdateMetadataRequest,
+ def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest,
brokerId: Int,
stateChangeLogger: StateChangeLogger) {
inWriteLock(partitionMetadataLock) {
- aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
- updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
- if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
+ aliveBrokers = updateMetadataRequest.liveBrokers.asScala.map { broker =>
+ val endPoints = broker.endPoints.asScala.map { case (protocol, ep) =>
+ (protocol, EndPoint(ep.host, ep.port, protocol))
+ }.toMap
+ (broker.id, Broker(broker.id, endPoints))
+ }.toMap
+
+ updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
+ if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(tp.topic, tp.partition)
stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d")
.format(brokerId, tp, updateMetadataRequest.controllerId,
- updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+ updateMetadataRequest.controllerEpoch, correlationId))
} else {
- addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
+ val partitionInfo = partitionStateToPartitionStateInfo(info)
+ addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d")
.format(brokerId, info, tp, updateMetadataRequest.controllerId,
- updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+ updateMetadataRequest.controllerEpoch, correlationId))
}
}
}
}
+ private def partitionStateToPartitionStateInfo(partitionState: PartitionState): PartitionStateInfo = {
+ val leaderAndIsr = LeaderAndIsr(partitionState.leader, partitionState.leaderEpoch, partitionState.isr.asScala.map(_.toInt).toList, partitionState.zkVersion)
+ val leaderInfo = LeaderIsrAndControllerEpoch(leaderAndIsr, partitionState.controllerEpoch)
+ PartitionStateInfo(leaderInfo, partitionState.replicas.asScala.map(_.toInt))
+ }
+
def contains(topic: String): Boolean = {
inReadLock(partitionMetadataLock) {
cache.contains(topic)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0ffb0e3..61b6887 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -34,8 +34,7 @@ InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException,
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
-import org.apache.kafka.common.requests.StopReplicaRequest
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time => JTime}
@@ -569,17 +568,17 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
+ def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
replicaStateChangeLock synchronized {
if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
"old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
- updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
+ correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
controllerEpoch)
stateChangeLogger.warn(stateControllerEpochErrorMessage)
throw new ControllerMovedException(stateControllerEpochErrorMessage)
} else {
- metadataCache.updateCache(updateMetadataRequest, localBrokerId, stateChangeLogger)
+ metadataCache.updateCache(correlationId, updateMetadataRequest, localBrokerId, stateChangeLogger)
controllerEpoch = updateMetadataRequest.controllerEpoch
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 0fce611..b9bbace 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,6 +17,7 @@
package kafka.admin
+import kafka.api.TopicMetadata
import org.junit.Assert._
import org.apache.kafka.common.protocol.SecurityProtocol
import kafka.zk.ZooKeeperTestHarness
@@ -138,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
- def testReplicaPlacement {
+ def testReplicaPlacementAllServers {
AdminUtils.addPartitions(zkUtils, topic3, 7)
// read metadata from a broker and verify the new topic partitions exist
@@ -149,52 +150,46 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacement",
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacementAllServers",
2000,0).topicsMetadata
- val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head
- val partitionsMetadataForTopic3 = metaDataForTopic3.partitionsMetadata.sortBy(_.partitionId)
- val partition1DataForTopic3 = partitionsMetadataForTopic3(1)
- val partition2DataForTopic3 = partitionsMetadataForTopic3(2)
- val partition3DataForTopic3 = partitionsMetadataForTopic3(3)
- val partition4DataForTopic3 = partitionsMetadataForTopic3(4)
- val partition5DataForTopic3 = partitionsMetadataForTopic3(5)
- val partition6DataForTopic3 = partitionsMetadataForTopic3(6)
-
- assertEquals(partition1DataForTopic3.replicas.size, 4)
- assertEquals(partition1DataForTopic3.replicas(0).id, 3)
- assertEquals(partition1DataForTopic3.replicas(1).id, 2)
- assertEquals(partition1DataForTopic3.replicas(2).id, 0)
- assertEquals(partition1DataForTopic3.replicas(3).id, 1)
-
- assertEquals(partition2DataForTopic3.replicas.size, 4)
- assertEquals(partition2DataForTopic3.replicas(0).id, 0)
- assertEquals(partition2DataForTopic3.replicas(1).id, 3)
- assertEquals(partition2DataForTopic3.replicas(2).id, 1)
- assertEquals(partition2DataForTopic3.replicas(3).id, 2)
-
- assertEquals(partition3DataForTopic3.replicas.size, 4)
- assertEquals(partition3DataForTopic3.replicas(0).id, 1)
- assertEquals(partition3DataForTopic3.replicas(1).id, 0)
- assertEquals(partition3DataForTopic3.replicas(2).id, 2)
- assertEquals(partition3DataForTopic3.replicas(3).id, 3)
-
- assertEquals(partition4DataForTopic3.replicas.size, 4)
- assertEquals(partition4DataForTopic3.replicas(0).id, 2)
- assertEquals(partition4DataForTopic3.replicas(1).id, 3)
- assertEquals(partition4DataForTopic3.replicas(2).id, 0)
- assertEquals(partition4DataForTopic3.replicas(3).id, 1)
-
- assertEquals(partition5DataForTopic3.replicas.size, 4)
- assertEquals(partition5DataForTopic3.replicas(0).id, 3)
- assertEquals(partition5DataForTopic3.replicas(1).id, 0)
- assertEquals(partition5DataForTopic3.replicas(2).id, 1)
- assertEquals(partition5DataForTopic3.replicas(3).id, 2)
-
- assertEquals(partition6DataForTopic3.replicas.size, 4)
- assertEquals(partition6DataForTopic3.replicas(0).id, 0)
- assertEquals(partition6DataForTopic3.replicas(1).id, 1)
- assertEquals(partition6DataForTopic3.replicas(2).id, 2)
- assertEquals(partition6DataForTopic3.replicas(3).id, 3)
+ val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get
+
+ validateLeaderAndReplicas(metaDataForTopic3, 0, 2, Set(2, 3, 0, 1))
+ validateLeaderAndReplicas(metaDataForTopic3, 1, 3, Set(3, 2, 0, 1))
+ validateLeaderAndReplicas(metaDataForTopic3, 2, 0, Set(0, 3, 1, 2))
+ validateLeaderAndReplicas(metaDataForTopic3, 3, 1, Set(1, 0, 2, 3))
+ validateLeaderAndReplicas(metaDataForTopic3, 4, 2, Set(2, 3, 0, 1))
+ validateLeaderAndReplicas(metaDataForTopic3, 5, 3, Set(3, 0, 1, 2))
+ validateLeaderAndReplicas(metaDataForTopic3, 6, 0, Set(0, 1, 2, 3))
+ }
+
+ @Test
+ def testReplicaPlacementPartialServers {
+ AdminUtils.addPartitions(zkUtils, topic2, 3)
+
+ // read metadata from a broker and verify the new topic partitions exist
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
+
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacementPartialServers",
+ 2000,0).topicsMetadata
+
+ val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get
+
+ validateLeaderAndReplicas(metaDataForTopic2, 0, 1, Set(1, 2))
+ validateLeaderAndReplicas(metaDataForTopic2, 1, 2, Set(0, 2))
+ validateLeaderAndReplicas(metaDataForTopic2, 2, 3, Set(1, 3))
+ }
+
+ def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int, expectedReplicas: Set[Int]) = {
+ val partitionOpt = metadata.partitionsMetadata.find(_.partitionId == partitionId)
+ assertTrue(s"Partition $partitionId should exist", partitionOpt.isDefined)
+ val partition = partitionOpt.get
+
+ assertTrue("Partition leader should exist", partition.leader.isDefined)
+ assertEquals("Partition leader id should match", expectedLeaderId, partition.leader.get.id)
+
+ assertEquals("Replica set should match", expectedReplicas, partition.replicas.map(_.id).toSet)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cfbca00..e4b8854 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -17,7 +17,7 @@
package kafka.api
-import kafka.cluster.{BrokerEndPoint, EndPoint, Broker}
+import kafka.cluster.{EndPoint, Broker}
import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError}
import kafka.common._
import kafka.message.{Message, ByteBufferMessageSet}
@@ -209,22 +209,6 @@ object SerializationTestUtils {
def createConsumerMetadataResponse: GroupCoordinatorResponse = {
GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), Errors.NONE.code, 0)
}
-
- def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
- UpdateMetadataRequest(
- versionId,
- correlationId = 0,
- clientId = "client1",
- controllerId = 0,
- controllerEpoch = 0,
- partitionStateInfos = updateMetadataRequestPartitionStateInfo,
- brokers.toSet
- )
- }
-
- def createUpdateMetadataResponse: UpdateMetadataResponse = {
- UpdateMetadataResponse( correlationId = 0, errorCode = 0)
- }
}
class RequestResponseSerializationTest extends JUnitSuite {
@@ -244,10 +228,6 @@ class RequestResponseSerializationTest extends JUnitSuite {
private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
- private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0)
- private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1)
- private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse
-
@Test
def testSerializationAndDeserialization() {
@@ -259,7 +239,6 @@ class RequestResponseSerializationTest extends JUnitSuite {
offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2,
offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
consumerMetadataRequest, consumerMetadataResponse,
- updateMetadataRequestV0, updateMetadataRequestV1, updateMetdataResponse,
consumerMetadataResponseNoCoordinator)
requestsAndResponses.foreach { original =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df56145/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 1ef26f6..2400cfb 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -253,17 +253,19 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
// Assert that topic metadata at new brokers is updated correctly
servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
- waitUntilTrue(() =>
- topicMetadata == ClientUtils.fetchTopicMetadata(
- Set.empty,
- Seq(new Broker(x.config.brokerId,
- x.config.hostName,
- x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
- "TopicMetadataTest-testBasicTopicMetadata",
- 2000, 0), "Topic metadata is not correctly updated"))
+ waitUntilTrue(() => {
+ val foundMetadata = ClientUtils.fetchTopicMetadata(
+ Set.empty,
+ Seq(new Broker(x.config.brokerId,
+ x.config.hostName,
+ x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
+ "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
+ topicMetadata.brokers.sortBy(_.id) == foundMetadata.brokers.sortBy(_.id) &&
+ topicMetadata.topicsMetadata.sortBy(_.topic) == foundMetadata.topicsMetadata.sortBy(_.topic)
+ },
+ s"Topic metadata is not correctly updated"))
}
-
@Test
def testAliveBrokerListWithNoTopics {
checkMetadata(Seq(server1), 1)