You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 23:14:29 UTC
kafka git commit: MINOR: Small refactor of request quotas handling in
KafkaApis
Repository: kafka
Updated Branches:
refs/heads/trunk 56623efd7 -> b661d3b8a
MINOR: Small refactor of request quotas handling in KafkaApis
- Avoid unnecessary inner methods
- Remove redundant parameter in `sendResponseExemptThrottle`
- Go through `sendResponseExemptThrottle` for produce requests with acks=0
- Tighten how we handle cases where there’s no response
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #3087 from ijuma/kafka-apis-improvements
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b661d3b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b661d3b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b661d3b8
Branch: refs/heads/trunk
Commit: b661d3b8ab37fc63b44145ba29d129b07c1582b9
Parents: 56623ef
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri May 19 00:14:08 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 19 00:14:08 2017 +0100
----------------------------------------------------------------------
.../kafka/api/ControlledShutdownRequest.scala | 2 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 2 +-
.../kafka/api/GroupCoordinatorRequest.scala | 4 +-
.../scala/kafka/api/OffsetCommitRequest.scala | 2 +-
.../scala/kafka/api/OffsetFetchRequest.scala | 2 +-
.../main/scala/kafka/api/OffsetRequest.scala | 2 +-
.../main/scala/kafka/api/ProducerRequest.scala | 4 +-
.../scala/kafka/api/TopicMetadataRequest.scala | 2 +-
.../scala/kafka/network/RequestChannel.scala | 40 +-
.../main/scala/kafka/network/SocketServer.scala | 12 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 390 +++++++++----------
.../unit/kafka/network/SocketServerTest.scala | 16 +-
12 files changed, 231 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 46ae1e7..a0ad6cf 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -69,7 +69,7 @@ case class ControlledShutdownRequest(versionId: Short,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e), Set.empty[TopicAndPartition])
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean = false): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 39da605..60284f7 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -208,7 +208,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
}
val errorResponse = new JFetchResponse(responseData, 0)
// Magic value does not matter here because the message set is empty
- requestChannel.sendResponse(new RequestChannel.Response(request, errorResponse))
+ requestChannel.sendResponse(RequestChannel.Response(request, errorResponse))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index d99474d..b3616fa 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -65,7 +65,7 @@ case class GroupCoordinatorRequest(group: String,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
// return ConsumerCoordinatorNotAvailable for all uncaught errors
val errorResponse = GroupCoordinatorResponse(None, Errors.COORDINATOR_NOT_AVAILABLE, correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
def describe(details: Boolean) = {
@@ -77,4 +77,4 @@ case class GroupCoordinatorRequest(group: String,
consumerMetadataRequest.append("; Group: " + group)
consumerMetadataRequest.toString()
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index b9693f6..e598500 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -165,7 +165,7 @@ case class OffsetCommitRequest(groupId: String,
val commitStatus = requestInfo.mapValues(_ => error)
val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse)))
+ requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, commitResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 33d3795..310860f 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -104,7 +104,7 @@ case class OffsetFetchRequest(groupId: String,
}
val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, error=thrownError)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 879d60d..876022c 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -118,7 +118,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
(topicAndPartition, PartitionOffsetsResponse(Errors.forException(e), Nil))
}
val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index bd48388..38fef5b 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -130,14 +130,14 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
if (request.body[org.apache.kafka.common.requests.ProduceRequest].acks == 0) {
- requestChannel.closeConnection(request.processor, request)
+ requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction))
}
else {
val producerResponseStatus = data.map { case (topicAndPartition, _) =>
(topicAndPartition, ProducerResponseStatus(Errors.forException(e), -1l, Message.NoTimestamp))
}
val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 403152a..6bbcab5 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -64,7 +64,7 @@ case class TopicMetadataRequest(versionId: Short,
topic => TopicMetadata(topic, Nil, Errors.forException(e))
}
val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
override def describe(details: Boolean): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2d41869..bb19346 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.log4j.Logger
-import scala.reflect.{ClassTag, classTag}
+import scala.reflect.ClassTag
object RequestChannel extends Logging {
val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
@@ -195,15 +195,27 @@ object RequestChannel extends Logging {
}
}
- case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
+ object Response {
+
+ def apply(request: Request, responseSend: Send): Response = {
+ require(request != null, "request should be non null")
+ require(responseSend != null, "responseSend should be non null")
+ new Response(request, Some(responseSend), SendAction)
+ }
+
+ def apply(request: Request, response: AbstractResponse): Response = {
+ require(request != null, "request should be non null")
+ require(response != null, "response should be non null")
+ apply(request, response.toSend(request.connectionId, request.header))
+ }
+
+ }
+
+ case class Response(request: Request, responseSend: Option[Send], responseAction: ResponseAction) {
request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
- def this(request: Request, responseSend: Send) =
- this(request.processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
-
- def this(request: Request, response: AbstractResponse) =
- this(request, response.toSend(request.connectionId, request.header))
+ def processor: Int = request.processor
}
trait ResponseAction
@@ -251,20 +263,6 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
onResponse(response.processor)
}
- /** No operation to take for the request, need to read more over the network */
- def noOperation(processor: Int, request: RequestChannel.Request) {
- responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
- for(onResponse <- responseListeners)
- onResponse(processor)
- }
-
- /** Close the connection for the request */
- def closeConnection(processor: Int, request: RequestChannel.Request) {
- responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
- for(onResponse <- responseListeners)
- onResponse(processor)
- }
-
/** Get the next request or block until specified time has elapsed */
def receiveRequest(timeout: Long): RequestChannel.Request =
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 48d0233..414557e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -33,7 +33,7 @@ import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.protocol.types.SchemaException
@@ -466,7 +466,9 @@ private[kafka] class Processor(val id: Int,
if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
selector.unmute(channelId)
case RequestChannel.SendAction =>
- sendResponse(curr)
+ val responseSend = curr.responseSend.getOrElse(
+ throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
+ sendResponse(curr, responseSend)
case RequestChannel.CloseConnectionAction =>
updateRequestMetrics(curr.request)
trace("Closing socket connection actively according to the response code.")
@@ -479,16 +481,16 @@ private[kafka] class Processor(val id: Int,
}
/* `protected` for test usage */
- protected[network] def sendResponse(response: RequestChannel.Response) {
+ protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
trace(s"Socket server received response to send, registering for write and sending data: $response")
- val channel = selector.channel(response.responseSend.destination)
+ val channel = selector.channel(responseSend.destination)
// `channel` can be null if the selector closed the connection because it was idle for too long
if (channel == null) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
response.request.updateRequestMetrics(0L)
}
else {
- selector.send(response.responseSend)
+ selector.send(responseSend)
inflightResponses += (response.request.connectionId -> response)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index db15f72..1346fb3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,7 +34,6 @@ import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.{Log, LogManager, TimestampOffset}
import kafka.network.{RequestChannel, RequestOrResponseSend}
-import kafka.network.RequestChannel.{Response, Session}
import kafka.security.auth._
import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors._
@@ -167,11 +166,11 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
- sendResponseExemptThrottle(request, new Response(request, leaderAndIsrResponse))
+ sendResponseExemptThrottle(RequestChannel.Response(request, leaderAndIsrResponse))
} else {
val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
- def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, _ =>
+ new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
}
} catch {
case e: FatalExitError => throw e
@@ -201,11 +200,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
val response = new StopReplicaResponse(error, result.asJava)
- sendResponseExemptThrottle(request, new Response(request, response))
+ sendResponseExemptThrottle(RequestChannel.Response(request, response))
} else {
val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
- def createResponse(throttleTimeMs: Int): AbstractResponse = new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, _ =>
+ new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
}
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
@@ -225,10 +224,9 @@ class KafkaApis(val requestChannel: RequestChannel,
adminManager.tryCompleteDelayedTopicOperations(topic)
}
}
- sendResponseExemptThrottle(request, new Response(request, new UpdateMetadataResponse(Errors.NONE)))
+ sendResponseExemptThrottle(RequestChannel.Response(request, new UpdateMetadataResponse(Errors.NONE)))
} else {
- def createResponse(throttleTimeMs: Int): AbstractResponse = new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, _ => new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
}
}
@@ -245,7 +243,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case Success(partitionsRemaining) =>
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
Errors.NONE, partitionsRemaining)
- sendResponseExemptThrottle(request, new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
+ sendResponseExemptThrottle(RequestChannel.Response(request,
+ new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
case Failure(throwable) =>
sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request))
}
@@ -266,8 +265,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
(topicPartition, error)
}.toMap
- def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, results.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
} else {
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
case (topicPartition, _) =>
@@ -296,8 +294,8 @@ class KafkaApis(val requestChannel: RequestChannel,
s"on partition $topicPartition failed due to ${error.exceptionName}")
}
}
- def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
}
if (authorizedTopics.isEmpty)
@@ -364,7 +362,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- private def authorize(session: Session, operation: Operation, resource: Resource): Boolean =
+ private def authorize(session: RequestChannel.Session, operation: Operation, resource: Resource): Boolean =
authorizer.forall(_.authorize(session, operation, resource))
/**
@@ -375,9 +373,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size
if (produceRequest.isTransactional && !authorize(request.session, Write, new Resource(ProducerTransactionalId, produceRequest.transactionalId())))
- sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
+ sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
else if (produceRequest.isIdempotent && !authorize(request.session, Write, Resource.ProducerIdResource))
- sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception()))
+ sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception))
else {
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
@@ -413,25 +411,23 @@ class KafkaApis(val requestChannel: RequestChannel,
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
// the producer client will know that some error has happened and will refresh its metadata
- if (errorInResponse) {
- val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
- topicPartition -> status.error.exceptionName
- }.mkString(", ")
- info(
- s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
- s"from client id ${request.header.clientId} with ack=0\n" +
- s"Topic and partition to exceptions: $exceptionsSummary"
- )
- requestChannel.closeConnection(request.processor, request)
- } else {
- requestChannel.noOperation(request.processor, request)
- }
+ val action =
+ if (errorInResponse) {
+ val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
+ topicPartition -> status.error.exceptionName
+ }.mkString(", ")
+ info(
+ s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
+ s"from client id ${request.header.clientId} with ack=0\n" +
+ s"Topic and partition to exceptions: $exceptionsSummary"
+ )
+ RequestChannel.CloseConnectionAction
+ } else RequestChannel.NoOpAction
+ sendResponseExemptThrottle(new RequestChannel.Response(request, None, action))
} else {
- def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = {
- new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs)
- }
-
- sendResponseMaybeThrottle(request, createResponseCallback)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs)
+ )
}
}
@@ -552,16 +548,16 @@ class KafkaApis(val requestChannel: RequestChannel,
brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
}
- val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
- new RequestChannel.Response(request, responseSend)
- }
- def sendResponseCallback(requestThrottleTimeMs: Int) {
- requestChannel.sendResponse(createResponse(requestThrottleTimeMs))
+ val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs,
+ request.connectionId, request.header)
+ RequestChannel.Response(request, responseSend)
}
+
if (fetchRequest.isFromFollower)
- sendResponseExemptThrottle(request, createResponse(0))
+ sendResponseExemptThrottle(createResponse(0))
else
- sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
+ sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs =>
+ requestChannel.sendResponse(createResponse(requestThrottleMs)))
}
// When this callback is triggered, the remote API call has completed.
@@ -628,8 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else
handleListOffsetRequestV1AndAbove(request)
- def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
}
private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
@@ -968,15 +963,14 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))
- def createResponse(throttleTimeMs: Int): AbstractResponse = new MetadataResponse(
- throttleTimeMs,
- brokers.map(_.getNode(request.listenerName)).asJava,
- clusterId,
- metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
- completeTopicMetadata.asJava
- )
-
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new MetadataResponse(
+ requestThrottleMs,
+ brokers.map(_.getNode(request.listenerName)).asJava,
+ clusterId,
+ metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+ completeTopicMetadata.asJava
+ ))
}
/**
@@ -989,11 +983,11 @@ class KafkaApis(val requestChannel: RequestChannel,
def authorizeTopicDescribe(partition: TopicPartition) =
authorize(request.session, Describe, new Resource(Topic, partition.topic))
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse =
// reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
- offsetFetchRequest.getErrorResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+ offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
else {
if (header.apiVersion == 0) {
val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
@@ -1023,17 +1017,17 @@ class KafkaApis(val requestChannel: RequestChannel,
}.toMap
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
- new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+ new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
} else {
// versions 1 and above read offsets from Kafka
if (offsetFetchRequest.isAllPartitions) {
val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
if (error != Errors.NONE)
- offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+ offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
else {
// clients are not allowed to see offsets for topics that are not authorized for Describe
val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
- new OffsetFetchResponse(throttleTimeMs, Errors.NONE, authorizedPartitionData.asJava)
+ new OffsetFetchResponse(requestThrottleMs, Errors.NONE, authorizedPartitionData.asJava)
}
} else {
val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
@@ -1041,10 +1035,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
Some(authorizedPartitions))
if (error != Errors.NONE)
- offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+ offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
else {
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
- new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+ new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
}
}
}
@@ -1060,9 +1054,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
!authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
-
- def createResponse(throttleTimeMs: Int): AbstractResponse = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new FindCoordinatorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode))
} else {
// TODO: Authorize by transactional id if coordinator type is TRANSACTION
@@ -1082,19 +1075,19 @@ class KafkaApis(val requestChannel: RequestChannel,
throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
}
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = if (topicMetadata.error != Errors.NONE) {
- new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
} else {
val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
.find(_.partition == partition)
- .map(_.leader())
+ .map(_.leader)
coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
- new FindCoordinatorResponse(throttleTimeMs, Errors.NONE, endpoint)
+ new FindCoordinatorResponse(requestThrottleMs, Errors.NONE, endpoint)
case _ =>
- new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
}
}
trace("Sending FindCoordinator response %s for correlation id %d to client %s."
@@ -1108,7 +1101,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeGroupRequest(request: RequestChannel.Request) {
val describeRequest = request.body[DescribeGroupsRequest]
- val groups = describeRequest.groupIds().asScala.map { groupId =>
+ val groups = describeRequest.groupIds.asScala.map { groupId =>
if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
} else {
@@ -1123,19 +1116,18 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}.toMap
- def createResponse(throttleTimeMs: Int): AbstractResponse = new DescribeGroupsResponse(throttleTimeMs, groups.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeGroupsResponse(requestThrottleMs, groups.asJava))
}
def handleListGroupsRequest(request: RequestChannel.Request) {
if (!authorize(request.session, Describe, Resource.ClusterResource)) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = ListGroupsResponse.fromError(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ ListGroupsResponse.fromError(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED))
} else {
val (error, groups) = groupCoordinator.handleListGroups()
val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
- def createResponse(throttleTimeMs: Int): AbstractResponse = new ListGroupsResponse(throttleTimeMs, error, allGroups.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new ListGroupsResponse(requestThrottleMs, error, allGroups.asJava))
}
}
@@ -1145,8 +1137,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val responseBody = new JoinGroupResponse(throttleTimeMs, joinResult.error, joinResult.generationId,
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId,
joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
trace("Sending join group response %s for correlation id %d to client %s."
@@ -1157,15 +1149,16 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = new JoinGroupResponse(
- throttleTimeMs,
- Errors.GROUP_AUTHORIZATION_FAILED,
- JoinGroupResponse.UNKNOWN_GENERATION_ID,
- JoinGroupResponse.UNKNOWN_PROTOCOL,
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
- Collections.emptyMap())
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new JoinGroupResponse(
+ requestThrottleMs,
+ Errors.GROUP_AUTHORIZATION_FAILED,
+ JoinGroupResponse.UNKNOWN_GENERATION_ID,
+ JoinGroupResponse.UNKNOWN_PROTOCOL,
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+ Collections.emptyMap())
+ )
} else {
// let the coordinator to handle join-group
val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
@@ -1187,8 +1180,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val syncGroupRequest = request.body[SyncGroupRequest]
def sendResponseCallback(memberState: Array[Byte], error: Errors) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = new SyncGroupResponse(throttleTimeMs, error, ByteBuffer.wrap(memberState))
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new SyncGroupResponse(requestThrottleMs, error, ByteBuffer.wrap(memberState)))
}
if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
@@ -1209,8 +1202,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a heartbeat response
def sendResponseCallback(error: Errors) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val response = new HeartbeatResponse(throttleTimeMs, error)
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val response = new HeartbeatResponse(requestThrottleMs, error)
trace("Sending heartbeat response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
response
@@ -1219,10 +1212,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = new HeartbeatResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
- sendResponseMaybeThrottle(request, createResponse)
- }
- else {
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
+ } else {
// let the coordinator to handle heartbeat
groupCoordinator.handleHeartbeat(
heartbeatRequest.groupId(),
@@ -1237,8 +1229,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a leave-group response
def sendResponseCallback(error: Errors) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val response = new LeaveGroupResponse(throttleTimeMs, error)
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val response = new LeaveGroupResponse(requestThrottleMs, error)
trace("Sending leave group response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
response
@@ -1247,8 +1239,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaveGroupResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new LeaveGroupResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
} else {
// let the coordinator to handle leave-group
groupCoordinator.handleLeaveGroup(
@@ -1259,8 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleSaslHandshakeRequest(request: RequestChannel.Request) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms))
}
def handleApiVersionsRequest(request: RequestChannel.Request) {
@@ -1270,12 +1261,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// If this is considered to leak information about the broker version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
- def sendResponseCallback(throttleTimeMs: Int) {
+ def sendResponseCallback(requestThrottleMs: Int) {
val responseSend =
if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
- ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, throttleTimeMs).toSend(request.connectionId, request.header)
+ ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, requestThrottleMs).toSend(request.connectionId, request.header)
else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
- requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+ requestChannel.sendResponse(RequestChannel.Response(request, responseSend))
}
sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
}
@@ -1284,8 +1275,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val createTopicsRequest = request.body[CreateTopicsRequest]
def sendResponseCallback(results: Map[String, ApiError]): Unit = {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava)
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val responseBody = new CreateTopicsResponse(requestThrottleMs, results.asJava)
trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
}
@@ -1345,10 +1336,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def sendResponseCallback(results: Map[String, Errors]): Unit = {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
- val responseBody = new DeleteTopicsResponse(throttleTimeMs, completeResults.asJava)
+ val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava)
trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
}
@@ -1404,8 +1395,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def createResponse(throttleTimeMs: Int): AbstractResponse = new DeleteRecordsResponse(throttleTimeMs, mergedResponseStatus.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DeleteRecordsResponse(requestThrottleMs, mergedResponseStatus.asJava))
}
if (authorizedForDeleteTopics.isEmpty)
@@ -1425,12 +1416,13 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, Write, Resource.ProducerIdResource)) {
- sendResponseMaybeThrottle(request, (throttleTime: Int) => new InitProducerIdResponse(throttleTime, Errors.PRODUCER_ID_AUTHORIZATION_FAILED))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new InitProducerIdResponse(requestThrottleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED))
} else if (transactionalId == null || authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) {
// Send response callback
def sendResponseCallback(result: InitProducerIdResult): Unit = {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch)
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
responseBody
}
@@ -1438,7 +1430,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
}else
- sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new InitProducerIdResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new InitProducerIdResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
}
def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
@@ -1447,8 +1440,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if(authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) {
def sendResponseCallback(error: Errors) {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val responseBody = new EndTxnResponse(throttleTimeMs, error)
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val responseBody = new EndTxnResponse(requestThrottleMs, error)
trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.")
responseBody
}
@@ -1461,7 +1454,8 @@ class KafkaApis(val requestChannel: RequestChannel,
endTxnRequest.command,
sendResponseCallback)
} else
- sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new EndTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new EndTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
}
def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
@@ -1472,7 +1466,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val numAppends = new AtomicInteger(markers.size)
if (numAppends.get == 0) {
- sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
+ sendResponseExemptThrottle(RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
return
}
@@ -1497,7 +1491,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (numAppends.decrementAndGet() == 0)
- sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
+ sendResponseExemptThrottle(RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
}
// TODO: The current append API makes doing separate writes per producerId a little easier, but it would
@@ -1531,7 +1525,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitionsToAdd = addPartitionsToTxnRequest.partitions
if(!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId)))
- sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => addPartitionsToTxnRequest.getErrorResponse(1, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
else {
val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
@@ -1554,12 +1549,13 @@ class KafkaApis(val requestChannel: RequestChannel,
nonExistingOrUnauthorizedForDescribeTopics.map { tp => (tp, Errors.UNKNOWN_TOPIC_OR_PARTITION) }.toMap ++
internalTopics.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) }
- sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddPartitionsToTxnResponse(throttleTimeMs, partitionErrors.asJava))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava))
} else {
// Send response callback
def sendResponseCallback(error: Errors): Unit = {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs,
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs,
partitionsToAdd.asScala.map{tp => (tp, error)}.toMap.asJava)
trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
responseBody
@@ -1586,14 +1582,16 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
if (!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId)))
- sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
else if (!authorize(request.session, Read, new Resource(Group, groupId)))
- sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
else {
// Send response callback
def sendResponseCallback(error: Errors): Unit = {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error)
+ def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error)
trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
responseBody
}
@@ -1618,8 +1616,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val results = txnOffsetCommitRequest.offsets.keySet.asScala.map { topicPartition =>
(topicPartition, error)
}.toMap
- def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, results.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new TxnOffsetCommitResponse(requestThrottleMs, results.asJava))
} else {
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition {
case (topicPartition, _) =>
@@ -1649,8 +1647,8 @@ class KafkaApis(val requestChannel: RequestChannel,
s"on partition $topicPartition failed due to ${error.exceptionName}")
}
}
- def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
}
if (authorizedTopics.isEmpty)
@@ -1685,29 +1683,24 @@ class KafkaApis(val requestChannel: RequestChannel,
val describeAclsRequest = request.body[DescribeAclsRequest]
authorizer match {
case None =>
- def createResponse(throttleTimeMs: Int): AbstractResponse =
- new DescribeAclsResponse(throttleTimeMs, new SecurityDisabledException(
- "No Authorizer is configured on the broker."), Collections.emptySet[AclBinding]);
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DescribeAclsResponse(requestThrottleMs,
+ new SecurityDisabledException("No Authorizer is configured on the broker."),
+ Collections.emptySet()))
case Some(auth) =>
val filter = describeAclsRequest.filter()
- var returnedAcls = new util.ArrayList[AclBinding]
- val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
- aclMap.foreach {
- case (resource, acls) => {
- acls.foreach {
- case (acl) => {
- val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
- new AccessControlEntry(acl.principal.toString(), acl.host.toString(), acl.operation.toJava, acl.permissionType.toJava))
- if (filter.matches(fixture))
- returnedAcls.add(fixture)
- }
- }
+ val returnedAcls = new util.ArrayList[AclBinding]
+ val aclMap = auth.getAcls()
+ aclMap.foreach { case (resource, acls) =>
+ acls.foreach { acl =>
+ val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
+ new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
+ if (filter.matches(fixture))
+ returnedAcls.add(fixture)
}
}
- def createResponse(throttleTimeMs: Int): AbstractResponse =
- new DescribeAclsResponse(throttleTimeMs, null, returnedAcls)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DescribeAclsResponse(requestThrottleMs, null, returnedAcls))
}
}
@@ -1780,14 +1773,13 @@ class KafkaApis(val requestChannel: RequestChannel,
val createAclsRequest = request.body[CreateAclsRequest]
authorizer match {
case None =>
- def createResponse(throttleTimeMs: Int): AbstractResponse =
- createAclsRequest.getErrorResponse(throttleTimeMs,
- new SecurityDisabledException("No Authorizer is configured on the broker."))
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ createAclsRequest.getErrorResponse(requestThrottleMs,
+ new SecurityDisabledException("No Authorizer is configured on the broker.")))
case Some(auth) =>
val errors = mutable.HashMap[Int, Throwable]()
- var creations = ListBuffer[(Resource, Acl)]()
- for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
+ val creations = ListBuffer[(Resource, Acl)]()
+ for (i <- 0 until createAclsRequest.aclCreations.size) {
val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter)
result match {
case Failure(throwable) => errors.put(i, throwable)
@@ -1796,7 +1788,7 @@ class KafkaApis(val requestChannel: RequestChannel,
!resource.name.equals(Resource.ClusterResourceName))
throw new InvalidRequestException("The only valid name for the CLUSTER resource is " +
Resource.ClusterResourceName)
- if (resource.name.isEmpty())
+ if (resource.name.isEmpty)
throw new InvalidRequestException("Invalid empty resource name")
auth.addAcls(immutable.Set(acl), resource)
} catch {
@@ -1804,16 +1796,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
}
- var aclCreationResults = new java.util.ArrayList[AclCreationResponse]
+ val aclCreationResults = new java.util.ArrayList[AclCreationResponse]
for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
errors.get(i) match {
case Some(throwable) => aclCreationResults.add(new AclCreationResponse(throwable))
case None => aclCreationResults.add(new AclCreationResponse(null))
}
}
- def createResponse(throttleTimeMs: Int): AbstractResponse =
- new CreateAclsResponse(throttleTimeMs, aclCreationResults)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new CreateAclsResponse(requestThrottleMs, aclCreationResults))
}
}
@@ -1822,33 +1813,28 @@ class KafkaApis(val requestChannel: RequestChannel,
val deleteAclsRequest = request.body[DeleteAclsRequest]
authorizer match {
case None =>
- def createResponse(throttleTimeMs: Int): AbstractResponse =
- deleteAclsRequest.getErrorResponse(throttleTimeMs,
- new SecurityDisabledException("No Authorizer is configured on the broker."))
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ deleteAclsRequest.getErrorResponse(requestThrottleMs,
+ new SecurityDisabledException("No Authorizer is configured on the broker.")))
case Some(auth) =>
val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]()
- var toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]()
+ val toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]()
for (i <- 0 to deleteAclsRequest.filters().size - 1) {
toDelete.put(i, new ListBuffer[(Resource, Acl)]())
}
if (deleteAclsRequest.filters().asScala.exists { f => !f.matchesAtMostOne() }) {
// Delete based on filters that may match more than one ACL.
val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
- aclMap.foreach {
- case (resource, acls) => {
- acls.foreach {
- case (acl) => {
- val binding = new AclBinding(new AdminResource(AdminResourceType.
- fromString(resource.resourceType.toString), resource.name),
- new AccessControlEntry(acl.principal.toString(), acl.host.toString(),
- acl.operation.toJava, acl.permissionType.toJava))
- for (i <- 0 to deleteAclsRequest.filters().size - 1) {
- val filter = deleteAclsRequest.filters().get(i)
- if (filter.matches(binding)) {
- toDelete.get(i).get += ((resource, acl))
- }
- }
+ aclMap.foreach { case (resource, acls) =>
+ acls.foreach { acl =>
+ val binding = new AclBinding(new AdminResource(AdminResourceType.
+ fromString(resource.resourceType.toString), resource.name),
+ new AccessControlEntry(acl.principal.toString(), acl.host.toString(),
+ acl.operation.toJava, acl.permissionType.toJava))
+ for (i <- 0 to deleteAclsRequest.filters().size - 1) {
+ val filter = deleteAclsRequest.filters().get(i)
+ if (filter.matches(binding)) {
+ toDelete.get(i).get += ((resource, acl))
}
}
}
@@ -1885,9 +1871,7 @@ class KafkaApis(val requestChannel: RequestChannel,
filterResponses.add(filterResponseMap.getOrElse(i,
new AclFilterResponse(null, new util.ArrayList[AclDeletionResult]())))
}
- def createResponse(throttleTimeMs: Int): AbstractResponse =
- new DeleteAclsResponse(throttleTimeMs, filterResponses)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, requestThrottleMs => new DeleteAclsResponse(requestThrottleMs, filterResponses))
}
}
@@ -1899,13 +1883,13 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseBody = new OffsetsForLeaderEpochResponse(
replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
)
- sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
+ sendResponseExemptThrottle(RequestChannel.Response(request, responseBody))
}
private def handleError(request: RequestChannel.Request, e: Throwable) {
val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction
if (request.requestObj != null) {
- def sendResponseCallback(throttleTimeMs: Int) {
+ def sendResponseCallback(requestThrottleMs: Int) {
request.requestObj.handleError(e, requestChannel, request)
error("Error when handling request %s".format(request.requestObj), e)
}
@@ -1920,20 +1904,21 @@ class KafkaApis(val requestChannel: RequestChannel,
} else
sendResponseExemptThrottle(request, () => sendResponseCallback(0))
} else {
- def createResponse(throttleTimeMs: Int): AbstractResponse = {
- val response = request.body[AbstractRequest].getErrorResponse(throttleTimeMs, e)
-
+ def createResponse(requestThrottleMs: Int): RequestChannel.Response = {
+ val response = request.body[AbstractRequest].getErrorResponse(requestThrottleMs, e)
/* If request doesn't have a default error response, we just close the connection.
For example, when produce request has acks set to 0 */
if (response == null)
- requestChannel.closeConnection(request.processor, request)
- response
+ new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction)
+ else RequestChannel.Response(request, response)
}
error("Error when handling request %s".format(request.body[AbstractRequest]), e)
if (mayThrottle)
- sendResponseMaybeThrottle(request, createResponse)
+ sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs =>
+ requestChannel.sendResponse(createResponse(requestThrottleMs))
+ })
else
- sendResponseExemptThrottle(request, new RequestChannel.Response(request, createResponse(0)))
+ sendResponseExemptThrottle(createResponse(0))
}
}
@@ -1954,7 +1939,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
resource -> configsAuthorizationApiError(request.session, resource)
}
- sendResponseMaybeThrottle(request, new AlterConfigsResponse(_, (authorizedResult ++ unauthorizedResult).asJava))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
}
private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = {
@@ -1992,7 +1978,8 @@ class KafkaApis(val requestChannel: RequestChannel,
resource -> new DescribeConfigsResponse.Config(error, Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
}
- sendResponseMaybeThrottle(request, new DescribeConfigsResponse(_, (authorizedConfigs ++ unauthorizedConfigs).asJava))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ unauthorizedConfigs).asJava))
}
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
@@ -2001,12 +1988,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) {
- def sendResponseCallback(throttleTimeMs: Int) {
- val response = createResponse(throttleTimeMs)
- if (response != null)
- sendResponse(request, response)
- }
- sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
+ sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs =>
+ sendResponse(request, createResponse(requestThrottleMs))
+ })
}
private def sendResponseMaybeThrottle(request: RequestChannel.Request, clientId: String, sendResponseCallback: Int => Unit) {
@@ -2027,8 +2011,8 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback)
}
- private def sendResponseExemptThrottle(request: RequestChannel.Request, response: Response) {
- sendResponseExemptThrottle(request, () => requestChannel.sendResponse(response))
+ private def sendResponseExemptThrottle(response: RequestChannel.Response) {
+ sendResponseExemptThrottle(response.request, () => requestChannel.sendResponse(response))
}
private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) {
@@ -2042,7 +2026,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) {
- requestChannel.sendResponse(new Response(request, response))
+ requestChannel.sendResponse(RequestChannel.Response(request, response))
}
private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7678550..acf96e8 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -30,7 +30,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName, NetworkSend}
+import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
@@ -98,7 +98,7 @@ class SocketServerTest extends JUnitSuite {
byteBuffer.rewind()
val send = new NetworkSend(request.connectionId, byteBuffer)
- channel.sendResponse(new RequestChannel.Response(request, send))
+ channel.sendResponse(RequestChannel.Response(request, send))
}
def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
@@ -173,13 +173,13 @@ class SocketServerTest extends JUnitSuite {
val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
val serializedBytes = producerRequestBytes
- for (i <- 0 until 10)
+ for (_ <- 0 until 10)
sendRequest(plainSocket, serializedBytes)
plainSocket.close()
- for (i <- 0 until 10) {
+ for (_ <- 0 until 10) {
val request = server.requestChannel.receiveRequest(2000)
assertNotNull("receiveRequest timed out", request)
- server.requestChannel.noOperation(request.processor, request)
+ server.requestChannel.sendResponse(RequestChannel.Response(request, None, RequestChannel.NoOpAction))
}
}
@@ -331,9 +331,9 @@ class SocketServerTest extends JUnitSuite {
protocol: SecurityProtocol): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) {
- override protected[network] def sendResponse(response: RequestChannel.Response) {
+ override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
conn.close()
- super.sendResponse(response)
+ super.sendResponse(response, responseSend)
}
}
}
@@ -357,7 +357,7 @@ class SocketServerTest extends JUnitSuite {
// detected. If the buffer is larger than 102400 bytes, a second write is attempted and it fails with an
// IOException.
val send = new NetworkSend(request.connectionId, ByteBuffer.allocate(550000))
- channel.sendResponse(new RequestChannel.Response(request, send))
+ channel.sendResponse(RequestChannel.Response(request, send))
TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount,
s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")