You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/07 01:08:19 UTC

kafka git commit: KAFKA-2072; Replace StopReplica Request/Response with their org.apache.kafka.common.requests equivalents

Repository: kafka
Updated Branches:
  refs/heads/trunk d9cabfde6 -> 00114dae7


KAFKA-2072; Replace StopReplica Request/Response with their org.apache.kafka.common.requests equivalents

Author: David Jacot <da...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Grant Henke <gr...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #196 from dajac/KAFKA-2072-part-2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/00114dae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/00114dae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/00114dae

Branch: refs/heads/trunk
Commit: 00114dae7a0ecb361aa5b843d5c7a95381294bd2
Parents: d9cabfd
Author: David Jacot <da...@gmail.com>
Authored: Wed Jan 6 16:08:16 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 6 16:08:16 2016 -0800

----------------------------------------------------------------------
 .../scala/kafka/api/StopReplicaRequest.scala    | 126 -------------------
 .../scala/kafka/api/StopReplicaResponse.scala   |  75 -----------
 .../scala/kafka/network/RequestChannel.scala    |   1 -
 .../src/main/scala/kafka/server/KafkaApis.scala |  29 +++--
 .../scala/kafka/server/ReplicaManager.scala     |  21 ++--
 .../api/RequestResponseSerializationTest.scala  |  16 +--
 6 files changed, 31 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
deleted file mode 100644
index 03c7f3d..0000000
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio._
-import kafka.api.ApiUtils._
-import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestException}
-import kafka.common.{TopicAndPartition, ErrorMapping}
-import kafka.network.RequestChannel.Response
-import kafka.utils.Logging
-import org.apache.kafka.common.protocol.ApiKeys
-import collection.Set
-
-
-object StopReplicaRequest extends Logging {
-  val CurrentVersion = 0.shortValue
-  val DefaultClientId = ""
-  val DefaultAckTimeout = 100
-
-  def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
-    val controllerId = buffer.getInt
-    val controllerEpoch = buffer.getInt
-    val deletePartitions = buffer.get match {
-      case 1 => true
-      case 0 => false
-      case x =>
-        throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x))
-    }
-    val topicPartitionPairCount = buffer.getInt
-    val topicPartitionPairSet = new collection.mutable.HashSet[TopicAndPartition]()
-    (1 to topicPartitionPairCount) foreach { _ =>
-      topicPartitionPairSet.add(TopicAndPartition(readShortString(buffer), buffer.getInt))
-    }
-    StopReplicaRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
-                       deletePartitions, topicPartitionPairSet.toSet)
-  }
-}
-
-case class StopReplicaRequest(versionId: Short,
-                              correlationId: Int,
-                              clientId: String,
-                              controllerId: Int,
-                              controllerEpoch: Int,
-                              deletePartitions: Boolean,
-                              partitions: Set[TopicAndPartition])
-        extends RequestOrResponse(Some(ApiKeys.STOP_REPLICA.id)) {
-
-  def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
-    this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,
-         controllerId, controllerEpoch, deletePartitions, partitions)
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    buffer.putInt(correlationId)
-    writeShortString(buffer, clientId)
-    buffer.putInt(controllerId)
-    buffer.putInt(controllerEpoch)
-    buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
-    buffer.putInt(partitions.size)
-    for (topicAndPartition <- partitions) {
-      writeShortString(buffer, topicAndPartition.topic)
-      buffer.putInt(topicAndPartition.partition)
-    }
-  }
-
-  def sizeInBytes(): Int = {
-    var size =
-      2 + /* versionId */
-      4 + /* correlation id */
-      ApiUtils.shortStringLength(clientId) +
-      4 + /* controller id*/
-      4 + /* controller epoch */
-      1 + /* deletePartitions */
-      4 /* partition count */
-    for (topicAndPartition <- partitions){
-      size += (ApiUtils.shortStringLength(topicAndPartition.topic)) +
-              4 /* partition id */
-    }
-    size
-  }
-
-  override def toString(): String = {
-    describe(true)
-  }
-
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val responseMap = partitions.map {
-      case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    }.toMap
-    val errorResponse = StopReplicaResponse(correlationId, responseMap)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
-  }
-
-  override def describe(details: Boolean): String = {
-    val stopReplicaRequest = new StringBuilder
-    stopReplicaRequest.append("Name: " + this.getClass.getSimpleName)
-    stopReplicaRequest.append("; Version: " + versionId)
-    stopReplicaRequest.append("; CorrelationId: " + correlationId)
-    stopReplicaRequest.append("; ClientId: " + clientId)
-    stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
-    stopReplicaRequest.append("; ControllerId: " + controllerId)
-    stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
-    if(details)
-      stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
-    stopReplicaRequest.toString()
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
deleted file mode 100644
index 2fc3c95..0000000
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import collection.mutable.HashMap
-import collection.immutable.Map
-import kafka.common.{TopicAndPartition, ErrorMapping}
-import kafka.api.ApiUtils._
-
-
-object StopReplicaResponse {
-  def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
-    val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
-    val numEntries = buffer.getInt
-
-    val responseMap = new HashMap[TopicAndPartition, Short]()
-    for (i<- 0 until numEntries){
-      val topic = readShortString(buffer)
-      val partition = buffer.getInt
-      val partitionErrorCode = buffer.getShort()
-      responseMap.put(TopicAndPartition(topic, partition), partitionErrorCode)
-    }
-    new StopReplicaResponse(correlationId, responseMap.toMap, errorCode)
-  }
-}
-
-
-case class StopReplicaResponse(correlationId: Int,
-                               responseMap: Map[TopicAndPartition, Short],
-                               errorCode: Short = ErrorMapping.NoError)
-    extends RequestOrResponse() {
-  def sizeInBytes(): Int ={
-    var size =
-      4 /* correlation id */ + 
-      2 /* error code */ +
-      4 /* number of responses */
-    for ((key, value) <- responseMap) {
-      size +=
-        2 + key.topic.length /* topic */ +
-        4 /* partition */ +
-        2 /* error code for this partition */
-    }
-    size
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(correlationId)
-    buffer.putShort(errorCode)
-    buffer.putInt(responseMap.size)
-    for ((topicAndPartition, errorCode) <- responseMap){
-      writeShortString(buffer, topicAndPartition.topic)
-      buffer.putInt(topicAndPartition.partition)
-      buffer.putShort(errorCode)
-    }
-  }
-
-  override def describe(details: Boolean):String = { toString }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1ab51da..998f51a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -69,7 +69,6 @@ object RequestChannel extends Logging {
       Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom,
         ApiKeys.FETCH.id -> FetchRequest.readFrom,
         ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
-        ApiKeys.STOP_REPLICA.id -> StopReplicaRequest.readFrom,
         ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom,
         ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,
         ApiKeys.OFFSET_COMMIT.id -> OffsetCommitRequest.readFrom,

http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cbf5031..5fda0eb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -36,7 +36,8 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
 DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
-LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse}
+LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
+StopReplicaRequest, StopReplicaResponse}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Node}
 
@@ -90,9 +91,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     } catch {
       case e: Throwable =>
-        if ( request.requestObj != null)
+        if (request.requestObj != null) {
           request.requestObj.handleError(e, requestChannel, request)
-        else {
+          error("Error when handling request %s".format(request.requestObj), e)
+        } else {
           val response = request.body.getErrorResponse(request.header.apiVersion, e)
           val respHeader = new ResponseHeader(request.header.correlationId)
 
@@ -102,8 +104,10 @@ class KafkaApis(val requestChannel: RequestChannel,
             requestChannel.closeConnection(request.processor, request)
           else
             requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
+
+          error("Error when handling request %s".format(request.body), e)
         }
-        error("error when handling request %s".format(request.requestObj), e)
+
     } finally
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
@@ -152,13 +156,19 @@ class KafkaApis(val requestChannel: RequestChannel,
     // ensureTopicExists is only for client facing requests
     // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
     // stop serving data to clients for the topic being deleted
-    val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
+    val stopReplicaRequest = request.body.asInstanceOf[StopReplicaRequest]
 
-    authorizeClusterAction(request)
+    val responseHeader = new ResponseHeader(request.header.correlationId)
+    val response =
+      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+        val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+        new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
+      } else {
+        val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
+        new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
+      }
 
-    val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
-    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, stopReplicaResponse)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response)))
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
@@ -926,5 +936,4 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3c2fa36..75e6bae 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.LeaderAndIsrRequest
+import org.apache.kafka.common.requests.StopReplicaRequest
 import org.apache.kafka.common.utils.{Time => JTime}
 
 import scala.collection._
@@ -241,21 +242,21 @@ class ReplicaManager(val config: KafkaConfig,
     errorCode
   }
 
-  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicAndPartition, Short], Short) = {
+  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Short], Short) = {
     replicaStateChangeLock synchronized {
-      val responseMap = new collection.mutable.HashMap[TopicAndPartition, Short]
-      if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
-        stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d."
-          .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
-          " Latest known controller epoch is %d " + controllerEpoch)
+      val responseMap = new collection.mutable.HashMap[TopicPartition, Short]
+      if(stopReplicaRequest.controllerEpoch() < controllerEpoch) {
+        stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d"
+          .format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch))
         (responseMap, ErrorMapping.StaleControllerEpochCode)
       } else {
+        val partitions = stopReplicaRequest.partitions.asScala
         controllerEpoch = stopReplicaRequest.controllerEpoch
         // First stop fetchers for all partitions, then stop the corresponding replicas
-        replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
-        for(topicAndPartition <- stopReplicaRequest.partitions){
-          val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
-          responseMap.put(topicAndPartition, errorCode)
+        replicaFetcherManager.removeFetcherForPartitions(partitions.map(r => TopicAndPartition(r.topic, r.partition)))
+        for(topicPartition <- partitions){
+          val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions)
+          responseMap.put(topicPartition, errorCode)
         }
         (responseMap, ErrorMapping.NoError)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/00114dae/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index c645102..51d6c91 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -116,17 +116,6 @@ object SerializationTestUtils {
     TopicAndPartition(topic1,3) -> partitionStateInfo3
   )
 
-  def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(controllerId = 0, controllerEpoch = 1, correlationId = 0, deletePartitions = true,
-                           partitions = collection.immutable.Set(TopicAndPartition(topic1, 0),TopicAndPartition(topic2, 0)))
-  }
-
-  def createTestStopReplicaResponse() : StopReplicaResponse = {
-    val responseMap = Map((TopicAndPartition(topic1, 0), ErrorMapping.NoError),
-                          (TopicAndPartition(topic2, 0), ErrorMapping.NoError))
-    new StopReplicaResponse(0, responseMap.toMap)
-  }
-
   def createTestProducerRequest: ProducerRequest = {
     new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
   }
@@ -239,8 +228,6 @@ object SerializationTestUtils {
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
-  private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest
-  private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse
   private val producerRequest = SerializationTestUtils.createTestProducerRequest
   private val producerResponse = SerializationTestUtils.createTestProducerResponse
   private val fetchRequest = SerializationTestUtils.createTestFetchRequest
@@ -266,8 +253,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   def testSerializationAndDeserialization() {
 
     val requestsAndResponses =
-      collection.immutable.Seq(stopReplicaRequest,
-                               stopReplicaResponse, producerRequest, producerResponse,
+      collection.immutable.Seq(producerRequest, producerResponse,
                                fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest,
                                topicMetadataResponse,
                                offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2,