You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/07 01:08:19 UTC
kafka git commit: KAFKA-2072;
Replace StopReplica Request/Response with their
org.apache.kafka.common.requests equivalents
Repository: kafka
Updated Branches:
refs/heads/trunk d9cabfde6 -> 00114dae7
KAFKA-2072; Replace StopReplica Request/Response with their org.apache.kafka.common.requests equivalents
Author: David Jacot <da...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Grant Henke <gr...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #196 from dajac/KAFKA-2072-part-2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/00114dae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/00114dae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/00114dae
Branch: refs/heads/trunk
Commit: 00114dae7a0ecb361aa5b843d5c7a95381294bd2
Parents: d9cabfd
Author: David Jacot <da...@gmail.com>
Authored: Wed Jan 6 16:08:16 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 6 16:08:16 2016 -0800
----------------------------------------------------------------------
.../scala/kafka/api/StopReplicaRequest.scala | 126 -------------------
.../scala/kafka/api/StopReplicaResponse.scala | 75 -----------
.../scala/kafka/network/RequestChannel.scala | 1 -
.../src/main/scala/kafka/server/KafkaApis.scala | 29 +++--
.../scala/kafka/server/ReplicaManager.scala | 21 ++--
.../api/RequestResponseSerializationTest.scala | 16 +--
6 files changed, 31 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
deleted file mode 100644
index 03c7f3d..0000000
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio._
-import kafka.api.ApiUtils._
-import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestException}
-import kafka.common.{TopicAndPartition, ErrorMapping}
-import kafka.network.RequestChannel.Response
-import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
-import collection.Set
-
-
-object StopReplicaRequest extends Logging {
- val CurrentVersion = 0.shortValue
- val DefaultClientId = ""
- val DefaultAckTimeout = 100
-
- def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
- val versionId = buffer.getShort
- val correlationId = buffer.getInt
- val clientId = readShortString(buffer)
- val controllerId = buffer.getInt
- val controllerEpoch = buffer.getInt
- val deletePartitions = buffer.get match {
- case 1 => true
- case 0 => false
- case x =>
- throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x))
- }
- val topicPartitionPairCount = buffer.getInt
- val topicPartitionPairSet = new collection.mutable.HashSet[TopicAndPartition]()
- (1 to topicPartitionPairCount) foreach { _ =>
- topicPartitionPairSet.add(TopicAndPartition(readShortString(buffer), buffer.getInt))
- }
- StopReplicaRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
- deletePartitions, topicPartitionPairSet.toSet)
- }
-}
-
-case class StopReplicaRequest(versionId: Short,
- correlationId: Int,
- clientId: String,
- controllerId: Int,
- controllerEpoch: Int,
- deletePartitions: Boolean,
- partitions: Set[TopicAndPartition])
- extends RequestOrResponse(Some(ApiKeys.STOP_REPLICA.id)) {
-
- def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
- this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,
- controllerId, controllerEpoch, deletePartitions, partitions)
- }
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putShort(versionId)
- buffer.putInt(correlationId)
- writeShortString(buffer, clientId)
- buffer.putInt(controllerId)
- buffer.putInt(controllerEpoch)
- buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
- buffer.putInt(partitions.size)
- for (topicAndPartition <- partitions) {
- writeShortString(buffer, topicAndPartition.topic)
- buffer.putInt(topicAndPartition.partition)
- }
- }
-
- def sizeInBytes(): Int = {
- var size =
- 2 + /* versionId */
- 4 + /* correlation id */
- ApiUtils.shortStringLength(clientId) +
- 4 + /* controller id*/
- 4 + /* controller epoch */
- 1 + /* deletePartitions */
- 4 /* partition count */
- for (topicAndPartition <- partitions){
- size += (ApiUtils.shortStringLength(topicAndPartition.topic)) +
- 4 /* partition id */
- }
- size
- }
-
- override def toString(): String = {
- describe(true)
- }
-
- override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val responseMap = partitions.map {
- case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- }.toMap
- val errorResponse = StopReplicaResponse(correlationId, responseMap)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
- }
-
- override def describe(details: Boolean): String = {
- val stopReplicaRequest = new StringBuilder
- stopReplicaRequest.append("Name: " + this.getClass.getSimpleName)
- stopReplicaRequest.append("; Version: " + versionId)
- stopReplicaRequest.append("; CorrelationId: " + correlationId)
- stopReplicaRequest.append("; ClientId: " + clientId)
- stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
- stopReplicaRequest.append("; ControllerId: " + controllerId)
- stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
- if(details)
- stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
- stopReplicaRequest.toString()
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
deleted file mode 100644
index 2fc3c95..0000000
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import collection.mutable.HashMap
-import collection.immutable.Map
-import kafka.common.{TopicAndPartition, ErrorMapping}
-import kafka.api.ApiUtils._
-
-
-object StopReplicaResponse {
- def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
- val correlationId = buffer.getInt
- val errorCode = buffer.getShort
- val numEntries = buffer.getInt
-
- val responseMap = new HashMap[TopicAndPartition, Short]()
- for (i<- 0 until numEntries){
- val topic = readShortString(buffer)
- val partition = buffer.getInt
- val partitionErrorCode = buffer.getShort()
- responseMap.put(TopicAndPartition(topic, partition), partitionErrorCode)
- }
- new StopReplicaResponse(correlationId, responseMap.toMap, errorCode)
- }
-}
-
-
-case class StopReplicaResponse(correlationId: Int,
- responseMap: Map[TopicAndPartition, Short],
- errorCode: Short = ErrorMapping.NoError)
- extends RequestOrResponse() {
- def sizeInBytes(): Int ={
- var size =
- 4 /* correlation id */ +
- 2 /* error code */ +
- 4 /* number of responses */
- for ((key, value) <- responseMap) {
- size +=
- 2 + key.topic.length /* topic */ +
- 4 /* partition */ +
- 2 /* error code for this partition */
- }
- size
- }
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(correlationId)
- buffer.putShort(errorCode)
- buffer.putInt(responseMap.size)
- for ((topicAndPartition, errorCode) <- responseMap){
- writeShortString(buffer, topicAndPartition.topic)
- buffer.putInt(topicAndPartition.partition)
- buffer.putShort(errorCode)
- }
- }
-
- override def describe(details: Boolean):String = { toString }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1ab51da..998f51a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -69,7 +69,6 @@ object RequestChannel extends Logging {
Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom,
ApiKeys.FETCH.id -> FetchRequest.readFrom,
ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
- ApiKeys.STOP_REPLICA.id -> StopReplicaRequest.readFrom,
ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom,
ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,
ApiKeys.OFFSET_COMMIT.id -> OffsetCommitRequest.readFrom,
http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cbf5031..5fda0eb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -36,7 +36,8 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
-LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse}
+LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
+StopReplicaRequest, StopReplicaResponse}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Node}
@@ -90,9 +91,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} catch {
case e: Throwable =>
- if ( request.requestObj != null)
+ if (request.requestObj != null) {
request.requestObj.handleError(e, requestChannel, request)
- else {
+ error("Error when handling request %s".format(request.requestObj), e)
+ } else {
val response = request.body.getErrorResponse(request.header.apiVersion, e)
val respHeader = new ResponseHeader(request.header.correlationId)
@@ -102,8 +104,10 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.closeConnection(request.processor, request)
else
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
+
+ error("Error when handling request %s".format(request.body), e)
}
- error("error when handling request %s".format(request.requestObj), e)
+
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
@@ -152,13 +156,19 @@ class KafkaApis(val requestChannel: RequestChannel,
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
- val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
+ val stopReplicaRequest = request.body.asInstanceOf[StopReplicaRequest]
- authorizeClusterAction(request)
+ val responseHeader = new ResponseHeader(request.header.correlationId)
+ val response =
+ if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+ new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
+ } else {
+ val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
+ new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
+ }
- val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
- val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, stopReplicaResponse)))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response)))
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
}
@@ -926,5 +936,4 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3c2fa36..75e6bae 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.LeaderAndIsrRequest
+import org.apache.kafka.common.requests.StopReplicaRequest
import org.apache.kafka.common.utils.{Time => JTime}
import scala.collection._
@@ -241,21 +242,21 @@ class ReplicaManager(val config: KafkaConfig,
errorCode
}
- def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicAndPartition, Short], Short) = {
+ def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Short], Short) = {
replicaStateChangeLock synchronized {
- val responseMap = new collection.mutable.HashMap[TopicAndPartition, Short]
- if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
- stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d."
- .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
- " Latest known controller epoch is %d " + controllerEpoch)
+ val responseMap = new collection.mutable.HashMap[TopicPartition, Short]
+ if(stopReplicaRequest.controllerEpoch() < controllerEpoch) {
+ stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d"
+ .format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch))
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
+ val partitions = stopReplicaRequest.partitions.asScala
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
- replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
- for(topicAndPartition <- stopReplicaRequest.partitions){
- val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
- responseMap.put(topicAndPartition, errorCode)
+ replicaFetcherManager.removeFetcherForPartitions(partitions.map(r => TopicAndPartition(r.topic, r.partition)))
+ for(topicPartition <- partitions){
+ val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions)
+ responseMap.put(topicPartition, errorCode)
}
(responseMap, ErrorMapping.NoError)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index c645102..51d6c91 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -116,17 +116,6 @@ object SerializationTestUtils {
TopicAndPartition(topic1,3) -> partitionStateInfo3
)
- def createTestStopReplicaRequest() : StopReplicaRequest = {
- new StopReplicaRequest(controllerId = 0, controllerEpoch = 1, correlationId = 0, deletePartitions = true,
- partitions = collection.immutable.Set(TopicAndPartition(topic1, 0),TopicAndPartition(topic2, 0)))
- }
-
- def createTestStopReplicaResponse() : StopReplicaResponse = {
- val responseMap = Map((TopicAndPartition(topic1, 0), ErrorMapping.NoError),
- (TopicAndPartition(topic2, 0), ErrorMapping.NoError))
- new StopReplicaResponse(0, responseMap.toMap)
- }
-
def createTestProducerRequest: ProducerRequest = {
new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
}
@@ -239,8 +228,6 @@ object SerializationTestUtils {
}
class RequestResponseSerializationTest extends JUnitSuite {
- private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest
- private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse
private val producerRequest = SerializationTestUtils.createTestProducerRequest
private val producerResponse = SerializationTestUtils.createTestProducerResponse
private val fetchRequest = SerializationTestUtils.createTestFetchRequest
@@ -266,8 +253,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
def testSerializationAndDeserialization() {
val requestsAndResponses =
- collection.immutable.Seq(stopReplicaRequest,
- stopReplicaResponse, producerRequest, producerResponse,
+ collection.immutable.Seq(producerRequest, producerResponse,
fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest,
topicMetadataResponse,
offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2,