You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 23:14:29 UTC

kafka git commit: MINOR: Small refactor of request quotas handling in KafkaApis

Repository: kafka
Updated Branches:
  refs/heads/trunk 56623efd7 -> b661d3b8a


MINOR: Small refactor of request quotas handling in KafkaApis

- Avoid unnecessary inner methods
- Remove redundant parameter in `sendResponseExemptThrottle`
- Go through `sendResponseExemptThrottle` for produce requests with acks=0
- Tighten how we handle cases where there’s no response

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #3087 from ijuma/kafka-apis-improvements


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b661d3b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b661d3b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b661d3b8

Branch: refs/heads/trunk
Commit: b661d3b8ab37fc63b44145ba29d129b07c1582b9
Parents: 56623ef
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri May 19 00:14:08 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 19 00:14:08 2017 +0100

----------------------------------------------------------------------
 .../kafka/api/ControlledShutdownRequest.scala   |   2 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   2 +-
 .../kafka/api/GroupCoordinatorRequest.scala     |   4 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |   2 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |   2 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |   2 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   4 +-
 .../scala/kafka/api/TopicMetadataRequest.scala  |   2 +-
 .../scala/kafka/network/RequestChannel.scala    |  40 +-
 .../main/scala/kafka/network/SocketServer.scala |  12 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 390 +++++++++----------
 .../unit/kafka/network/SocketServerTest.scala   |  16 +-
 12 files changed, 231 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 46ae1e7..a0ad6cf 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -69,7 +69,7 @@ case class ControlledShutdownRequest(versionId: Short,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e), Set.empty[TopicAndPartition])
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+    requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean = false): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 39da605..60284f7 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -208,7 +208,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
     }
     val errorResponse = new JFetchResponse(responseData, 0)
     // Magic value does not matter here because the message set is empty
-    requestChannel.sendResponse(new RequestChannel.Response(request, errorResponse))
+    requestChannel.sendResponse(RequestChannel.Response(request, errorResponse))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index d99474d..b3616fa 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -65,7 +65,7 @@ case class GroupCoordinatorRequest(group: String,
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     // return ConsumerCoordinatorNotAvailable for all uncaught errors
     val errorResponse = GroupCoordinatorResponse(None, Errors.COORDINATOR_NOT_AVAILABLE, correlationId)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+    requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   def describe(details: Boolean) = {
@@ -77,4 +77,4 @@ case class GroupCoordinatorRequest(group: String,
     consumerMetadataRequest.append("; Group: " + group)
     consumerMetadataRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index b9693f6..e598500 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -165,7 +165,7 @@ case class OffsetCommitRequest(groupId: String,
     val commitStatus = requestInfo.mapValues(_ => error)
     val commitResponse = OffsetCommitResponse(commitStatus, correlationId)
 
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse)))
+    requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, commitResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 33d3795..310860f 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -104,7 +104,7 @@ case class OffsetFetchRequest(groupId: String,
       }
 
     val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, error=thrownError)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+    requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 879d60d..876022c 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -118,7 +118,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
         (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e), Nil))
     }
     val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+    requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index bd48388..38fef5b 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -130,14 +130,14 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     if (request.body[org.apache.kafka.common.requests.ProduceRequest].acks == 0) {
-        requestChannel.closeConnection(request.processor, request)
+        requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction))
     }
     else {
       val producerResponseStatus = data.map { case (topicAndPartition, _) =>
         (topicAndPartition, ProducerResponseStatus(Errors.forException(e), -1l, Message.NoTimestamp))
       }
       val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
-      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+      requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 403152a..6bbcab5 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -64,7 +64,7 @@ case class TopicMetadataRequest(versionId: Short,
       topic => TopicMetadata(topic, Nil, Errors.forException(e))
     }
     val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+    requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2d41869..bb19346 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
 import org.apache.log4j.Logger
 
-import scala.reflect.{ClassTag, classTag}
+import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
   val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
@@ -195,15 +195,27 @@ object RequestChannel extends Logging {
     }
   }
 
-  case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
+  object Response {
+
+    def apply(request: Request, responseSend: Send): Response = {
+      require(request != null, "request should be non null")
+      require(responseSend != null, "responseSend should be non null")
+      new Response(request, Some(responseSend), SendAction)
+    }
+
+    def apply(request: Request, response: AbstractResponse): Response = {
+      require(request != null, "request should be non null")
+      require(response != null, "response should be non null")
+      apply(request, response.toSend(request.connectionId, request.header))
+    }
+
+  }
+
+  case class Response(request: Request, responseSend: Option[Send], responseAction: ResponseAction) {
     request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
     if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
 
-    def this(request: Request, responseSend: Send) =
-      this(request.processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
-
-    def this(request: Request, response: AbstractResponse) =
-      this(request, response.toSend(request.connectionId, request.header))
+    def processor: Int = request.processor
   }
 
   trait ResponseAction
@@ -251,20 +263,6 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
       onResponse(response.processor)
   }
 
-  /** No operation to take for the request, need to read more over the network */
-  def noOperation(processor: Int, request: RequestChannel.Request) {
-    responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
-    for(onResponse <- responseListeners)
-      onResponse(processor)
-  }
-
-  /** Close the connection for the request */
-  def closeConnection(processor: Int, request: RequestChannel.Request) {
-    responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
-    for(onResponse <- responseListeners)
-      onResponse(processor)
-  }
-
   /** Get the next request or block until specified time has elapsed */
   def receiveRequest(timeout: Long): RequestChannel.Request =
     requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 48d0233..414557e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -33,7 +33,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -466,7 +466,9 @@ private[kafka] class Processor(val id: Int,
             if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
                 selector.unmute(channelId)
           case RequestChannel.SendAction =>
-            sendResponse(curr)
+            val responseSend = curr.responseSend.getOrElse(
+              throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
+            sendResponse(curr, responseSend)
           case RequestChannel.CloseConnectionAction =>
             updateRequestMetrics(curr.request)
             trace("Closing socket connection actively according to the response code.")
@@ -479,16 +481,16 @@ private[kafka] class Processor(val id: Int,
   }
 
   /* `protected` for test usage */
-  protected[network] def sendResponse(response: RequestChannel.Response) {
+  protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
     trace(s"Socket server received response to send, registering for write and sending data: $response")
-    val channel = selector.channel(response.responseSend.destination)
+    val channel = selector.channel(responseSend.destination)
     // `channel` can be null if the selector closed the connection because it was idle for too long
     if (channel == null) {
       warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
       response.request.updateRequestMetrics(0L)
     }
     else {
-      selector.send(response.responseSend)
+      selector.send(responseSend)
       inflightResponses += (response.request.connectionId -> response)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index db15f72..1346fb3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,7 +34,6 @@ import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
-import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth._
 import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
@@ -167,11 +166,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
         val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
         val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
-        sendResponseExemptThrottle(request, new Response(request, leaderAndIsrResponse))
+        sendResponseExemptThrottle(RequestChannel.Response(request, leaderAndIsrResponse))
       } else {
         val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-        def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, _ =>
+          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
       }
     } catch {
       case e: FatalExitError => throw e
@@ -201,11 +200,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
       val response = new StopReplicaResponse(error, result.asJava)
-      sendResponseExemptThrottle(request, new Response(request, response))
+      sendResponseExemptThrottle(RequestChannel.Response(request, response))
     } else {
       val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, _ =>
+        new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
     }
 
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
@@ -225,10 +224,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         adminManager.tryCompleteDelayedTopicOperations(topic)
         }
       }
-      sendResponseExemptThrottle(request, new Response(request, new UpdateMetadataResponse(Errors.NONE)))
+      sendResponseExemptThrottle(RequestChannel.Response(request, new UpdateMetadataResponse(Errors.NONE)))
     } else {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, _ => new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
     }
   }
 
@@ -245,7 +243,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case Success(partitionsRemaining) =>
           val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
             Errors.NONE, partitionsRemaining)
-          sendResponseExemptThrottle(request, new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
+          sendResponseExemptThrottle(RequestChannel.Response(request,
+            new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
         case Failure(throwable) =>
           sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request))
       }
@@ -266,8 +265,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
         (topicPartition, error)
       }.toMap
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, results.asJava)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
     } else {
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
         case (topicPartition, _) =>
@@ -296,8 +294,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 s"on partition $topicPartition failed due to ${error.exceptionName}")
             }
           }
-        def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
       }
 
       if (authorizedTopics.isEmpty)
@@ -364,7 +362,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  private def authorize(session: Session, operation: Operation, resource: Resource): Boolean =
+  private def authorize(session: RequestChannel.Session, operation: Operation, resource: Resource): Boolean =
     authorizer.forall(_.authorize(session, operation, resource))
 
   /**
@@ -375,9 +373,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size
 
     if (produceRequest.isTransactional && !authorize(request.session, Write, new Resource(ProducerTransactionalId, produceRequest.transactionalId())))
-      sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
+      sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
     else if (produceRequest.isIdempotent && !authorize(request.session, Write, Resource.ProducerIdResource))
-      sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception()))
+      sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception))
     else {
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
         produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
@@ -413,25 +411,23 @@ class KafkaApis(val requestChannel: RequestChannel,
             // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
             // the request, since no response is expected by the producer, the server will close socket server so that
             // the producer client will know that some error has happened and will refresh its metadata
-            if (errorInResponse) {
-              val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
-                topicPartition -> status.error.exceptionName
-              }.mkString(", ")
-              info(
-                s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
-                  s"from client id ${request.header.clientId} with ack=0\n" +
-                  s"Topic and partition to exceptions: $exceptionsSummary"
-              )
-              requestChannel.closeConnection(request.processor, request)
-            } else {
-              requestChannel.noOperation(request.processor, request)
-            }
+            val action =
+              if (errorInResponse) {
+                val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
+                  topicPartition -> status.error.exceptionName
+                }.mkString(", ")
+                info(
+                  s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
+                    s"from client id ${request.header.clientId} with ack=0\n" +
+                    s"Topic and partition to exceptions: $exceptionsSummary"
+                )
+                RequestChannel.CloseConnectionAction
+              } else RequestChannel.NoOpAction
+            sendResponseExemptThrottle(new RequestChannel.Response(request, None, action))
           } else {
-            def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = {
-              new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs)
-            }
-
-            sendResponseMaybeThrottle(request, createResponseCallback)
+            sendResponseMaybeThrottle(request, requestThrottleMs =>
+              new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs)
+            )
           }
         }
 
@@ -552,16 +548,16 @@ class KafkaApis(val requestChannel: RequestChannel,
             brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
           }
 
-          val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header)
-          new RequestChannel.Response(request, responseSend)
-        }
-        def sendResponseCallback(requestThrottleTimeMs: Int) {
-          requestChannel.sendResponse(createResponse(requestThrottleTimeMs))
+          val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs,
+            request.connectionId, request.header)
+          RequestChannel.Response(request, responseSend)
         }
+
         if (fetchRequest.isFromFollower)
-          sendResponseExemptThrottle(request, createResponse(0))
+          sendResponseExemptThrottle(createResponse(0))
         else
-          sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
+          sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs =>
+            requestChannel.sendResponse(createResponse(requestThrottleMs)))
       }
 
       // When this callback is triggered, the remote API call has completed.
@@ -628,8 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       else
         handleListOffsetRequestV1AndAbove(request)
 
-    def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava)
-    sendResponseMaybeThrottle(request, createResponse)
+    sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
   }
 
   private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
@@ -968,15 +963,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
       brokers.mkString(","), request.header.correlationId, request.header.clientId))
 
-    def createResponse(throttleTimeMs: Int): AbstractResponse = new MetadataResponse(
-      throttleTimeMs,
-      brokers.map(_.getNode(request.listenerName)).asJava,
-      clusterId,
-      metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
-      completeTopicMetadata.asJava
-    )
-
-    sendResponseMaybeThrottle(request, createResponse)
+    sendResponseMaybeThrottle(request, requestThrottleMs =>
+      new MetadataResponse(
+        requestThrottleMs,
+        brokers.map(_.getNode(request.listenerName)).asJava,
+        clusterId,
+        metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+        completeTopicMetadata.asJava
+      ))
   }
 
   /**
@@ -989,11 +983,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     def authorizeTopicDescribe(partition: TopicPartition) =
       authorize(request.session, Describe, new Resource(Topic, partition.topic))
 
-    def createResponse(throttleTimeMs: Int): AbstractResponse = {
+    def createResponse(requestThrottleMs: Int): AbstractResponse = {
       val offsetFetchResponse =
         // reject the request if not authorized to the group
         if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
-          offsetFetchRequest.getErrorResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
+          offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
         else {
           if (header.apiVersion == 0) {
             val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
@@ -1023,17 +1017,17 @@ class KafkaApis(val requestChannel: RequestChannel,
             }.toMap
 
             val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
-            new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+            new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
           } else {
             // versions 1 and above read offsets from Kafka
             if (offsetFetchRequest.isAllPartitions) {
               val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
               if (error != Errors.NONE)
-                offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+                offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
               else {
                 // clients are not allowed to see offsets for topics that are not authorized for Describe
                 val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
-                new OffsetFetchResponse(throttleTimeMs, Errors.NONE, authorizedPartitionData.asJava)
+                new OffsetFetchResponse(requestThrottleMs, Errors.NONE, authorizedPartitionData.asJava)
               }
             } else {
               val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
@@ -1041,10 +1035,10 @@ class KafkaApis(val requestChannel: RequestChannel,
               val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
                 Some(authorizedPartitions))
               if (error != Errors.NONE)
-                offsetFetchRequest.getErrorResponse(throttleTimeMs, error)
+                offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
               else {
                 val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
-                new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
+                new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
               }
             }
           }
@@ -1060,9 +1054,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
       !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
-
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new FindCoordinatorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode))
     } else {
       // TODO: Authorize by transactional id if coordinator type is TRANSACTION
 
@@ -1082,19 +1075,19 @@ class KafkaApis(val requestChannel: RequestChannel,
           throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
       }
 
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val responseBody = if (topicMetadata.error != Errors.NONE) {
-          new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+          new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
         } else {
           val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
             .find(_.partition == partition)
-            .map(_.leader())
+            .map(_.leader)
 
           coordinatorEndpoint match {
             case Some(endpoint) if !endpoint.isEmpty =>
-              new FindCoordinatorResponse(throttleTimeMs, Errors.NONE, endpoint)
+              new FindCoordinatorResponse(requestThrottleMs, Errors.NONE, endpoint)
             case _ =>
-              new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+              new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
           }
         }
         trace("Sending FindCoordinator response %s for correlation id %d to client %s."
@@ -1108,7 +1101,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDescribeGroupRequest(request: RequestChannel.Request) {
     val describeRequest = request.body[DescribeGroupsRequest]
 
-    val groups = describeRequest.groupIds().asScala.map { groupId =>
+    val groups = describeRequest.groupIds.asScala.map { groupId =>
         if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
           groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
         } else {
@@ -1123,19 +1116,18 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
     }.toMap
 
-    def createResponse(throttleTimeMs: Int): AbstractResponse = new DescribeGroupsResponse(throttleTimeMs, groups.asJava)
-    sendResponseMaybeThrottle(request, createResponse)
+    sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeGroupsResponse(requestThrottleMs, groups.asJava))
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
     if (!authorize(request.session, Describe, Resource.ClusterResource)) {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = ListGroupsResponse.fromError(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        ListGroupsResponse.fromError(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED))
     } else {
       val (error, groups) = groupCoordinator.handleListGroups()
       val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new ListGroupsResponse(throttleTimeMs, error, allGroups.asJava)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new ListGroupsResponse(requestThrottleMs, error, allGroups.asJava))
     }
   }
 
@@ -1145,8 +1137,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a join-group response
     def sendResponseCallback(joinResult: JoinGroupResult) {
       val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(throttleTimeMs, joinResult.error, joinResult.generationId,
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId,
           joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
 
         trace("Sending join group response %s for correlation id %d to client %s."
@@ -1157,15 +1149,16 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new JoinGroupResponse(
-        throttleTimeMs,
-        Errors.GROUP_AUTHORIZATION_FAILED,
-        JoinGroupResponse.UNKNOWN_GENERATION_ID,
-        JoinGroupResponse.UNKNOWN_PROTOCOL,
-        JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
-        JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
-        Collections.emptyMap())
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new JoinGroupResponse(
+          requestThrottleMs,
+          Errors.GROUP_AUTHORIZATION_FAILED,
+          JoinGroupResponse.UNKNOWN_GENERATION_ID,
+          JoinGroupResponse.UNKNOWN_PROTOCOL,
+          JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+          JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+          Collections.emptyMap())
+      )
     } else {
       // let the coordinator to handle join-group
       val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
@@ -1187,8 +1180,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val syncGroupRequest = request.body[SyncGroupRequest]
 
     def sendResponseCallback(memberState: Array[Byte], error: Errors) {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new SyncGroupResponse(throttleTimeMs, error, ByteBuffer.wrap(memberState))
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new SyncGroupResponse(requestThrottleMs, error, ByteBuffer.wrap(memberState)))
     }
 
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
@@ -1209,8 +1202,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a heartbeat response
     def sendResponseCallback(error: Errors) {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(throttleTimeMs, error)
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        val response = new HeartbeatResponse(requestThrottleMs, error)
         trace("Sending heartbeat response %s for correlation id %d to client %s."
           .format(response, request.header.correlationId, request.header.clientId))
         response
@@ -1219,10 +1212,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new HeartbeatResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
-      sendResponseMaybeThrottle(request, createResponse)
-    }
-    else {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
+    } else {
       // let the coordinator to handle heartbeat
       groupCoordinator.handleHeartbeat(
         heartbeatRequest.groupId(),
@@ -1237,8 +1229,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a leave-group response
     def sendResponseCallback(error: Errors) {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val response = new LeaveGroupResponse(throttleTimeMs, error)
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        val response = new LeaveGroupResponse(requestThrottleMs, error)
         trace("Sending leave group response %s for correlation id %d to client %s."
                     .format(response, request.header.correlationId, request.header.clientId))
         response
@@ -1247,8 +1239,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaveGroupResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new LeaveGroupResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     } else {
       // let the coordinator to handle leave-group
       groupCoordinator.handleLeaveGroup(
@@ -1259,8 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSaslHandshakeRequest(request: RequestChannel.Request) {
-    def createResponse(throttleTimeMs: Int): AbstractResponse = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
-    sendResponseMaybeThrottle(request, createResponse)
+    sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms))
   }
 
   def handleApiVersionsRequest(request: RequestChannel.Request) {
@@ -1270,12 +1261,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    def sendResponseCallback(throttleTimeMs: Int) {
+    def sendResponseCallback(requestThrottleMs: Int) {
       val responseSend =
         if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
-          ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, throttleTimeMs).toSend(request.connectionId, request.header)
+          ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, requestThrottleMs).toSend(request.connectionId, request.header)
         else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
-      requestChannel.sendResponse(new RequestChannel.Response(request, responseSend))
+      requestChannel.sendResponse(RequestChannel.Response(request, responseSend))
     }
     sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
   }
@@ -1284,8 +1275,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val createTopicsRequest = request.body[CreateTopicsRequest]
 
     def sendResponseCallback(results: Map[String, ApiError]): Unit = {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava)
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        val responseBody = new CreateTopicsResponse(requestThrottleMs, results.asJava)
         trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
         responseBody
       }
@@ -1345,10 +1336,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
             unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
-        val responseBody = new DeleteTopicsResponse(throttleTimeMs, completeResults.asJava)
+        val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava)
         trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
         responseBody
       }
@@ -1404,8 +1395,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new DeleteRecordsResponse(throttleTimeMs, mergedResponseStatus.asJava)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new DeleteRecordsResponse(requestThrottleMs, mergedResponseStatus.asJava))
     }
 
     if (authorizedForDeleteTopics.isEmpty)
@@ -1425,12 +1416,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 
 
     if (!authorize(request.session, Write, Resource.ProducerIdResource)) {
-      sendResponseMaybeThrottle(request, (throttleTime: Int) => new InitProducerIdResponse(throttleTime, Errors.PRODUCER_ID_AUTHORIZATION_FAILED))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new InitProducerIdResponse(requestThrottleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED))
     } else if (transactionalId == null || authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) {
       // Send response callback
       def sendResponseCallback(result: InitProducerIdResult): Unit = {
-        def createResponse(throttleTimeMs: Int): AbstractResponse = {
-          val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch)
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
           trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
           responseBody
         }
@@ -1438,7 +1430,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
     }else
-      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new InitProducerIdResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new InitProducerIdResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
   }
 
   def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
@@ -1447,8 +1440,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     if(authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) {
       def sendResponseCallback(error: Errors) {
-        def createResponse(throttleTimeMs: Int): AbstractResponse = {
-          val responseBody = new EndTxnResponse(throttleTimeMs, error)
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = new EndTxnResponse(requestThrottleMs, error)
           trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.")
           responseBody
         }
@@ -1461,7 +1454,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         endTxnRequest.command,
         sendResponseCallback)
     } else
-      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new EndTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new EndTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
   }
 
   def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
@@ -1472,7 +1466,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val numAppends = new AtomicInteger(markers.size)
 
     if (numAppends.get == 0) {
-      sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
+      sendResponseExemptThrottle(RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
       return
     }
 
@@ -1497,7 +1491,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       if (numAppends.decrementAndGet() == 0)
-        sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
+        sendResponseExemptThrottle(RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
     }
 
     // TODO: The current append API makes doing separate writes per producerId a little easier, but it would
@@ -1531,7 +1525,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val partitionsToAdd = addPartitionsToTxnRequest.partitions
 
     if(!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId)))
-      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => addPartitionsToTxnRequest.getErrorResponse(1, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception()))
     else {
       val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
 
@@ -1554,12 +1549,13 @@ class KafkaApis(val requestChannel: RequestChannel,
           nonExistingOrUnauthorizedForDescribeTopics.map { tp => (tp, Errors.UNKNOWN_TOPIC_OR_PARTITION) }.toMap ++
           internalTopics.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) }
 
-        sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddPartitionsToTxnResponse(throttleTimeMs, partitionErrors.asJava))
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava))
       } else {
         // Send response callback
         def sendResponseCallback(error: Errors): Unit = {
-          def createResponse(throttleTimeMs: Int): AbstractResponse = {
-            val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs,
+          def createResponse(requestThrottleMs: Int): AbstractResponse = {
+            val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs,
               partitionsToAdd.asScala.map{tp => (tp, error)}.toMap.asJava)
             trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
             responseBody
@@ -1586,14 +1582,16 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
     if (!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId)))
-      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
     else if (!authorize(request.session, Read, new Resource(Group, groupId)))
-      sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED))
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
     else {
         // Send response callback
         def sendResponseCallback(error: Errors): Unit = {
-          def createResponse(throttleTimeMs: Int): AbstractResponse = {
-            val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error)
+          def createResponse(requestThrottleMs: Int): AbstractResponse = {
+            val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error)
             trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}")
             responseBody
           }
@@ -1618,8 +1616,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val results = txnOffsetCommitRequest.offsets.keySet.asScala.map { topicPartition =>
         (topicPartition, error)
       }.toMap
-      def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, results.asJava)
-      sendResponseMaybeThrottle(request, createResponse)
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new TxnOffsetCommitResponse(requestThrottleMs, results.asJava))
     } else {
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition {
         case (topicPartition, _) =>
@@ -1649,8 +1647,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 s"on partition $topicPartition failed due to ${error.exceptionName}")
             }
           }
-        def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
       }
 
       if (authorizedTopics.isEmpty)
@@ -1685,29 +1683,24 @@ class KafkaApis(val requestChannel: RequestChannel,
     val describeAclsRequest = request.body[DescribeAclsRequest]
     authorizer match {
       case None =>
-        def createResponse(throttleTimeMs: Int): AbstractResponse =
-          new DescribeAclsResponse(throttleTimeMs, new SecurityDisabledException(
-            "No Authorizer is configured on the broker."), Collections.emptySet[AclBinding]);
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new DescribeAclsResponse(requestThrottleMs,
+            new SecurityDisabledException("No Authorizer is configured on the broker."),
+            Collections.emptySet()))
       case Some(auth) =>
         val filter = describeAclsRequest.filter()
-        var returnedAcls = new util.ArrayList[AclBinding]
-        val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
-        aclMap.foreach {
-          case (resource, acls) => {
-            acls.foreach {
-              case (acl) => {
-                val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
-                    new AccessControlEntry(acl.principal.toString(), acl.host.toString(), acl.operation.toJava, acl.permissionType.toJava))
-                if (filter.matches(fixture))
-                  returnedAcls.add(fixture)
-              }
-            }
+        val returnedAcls = new util.ArrayList[AclBinding]
+        val aclMap = auth.getAcls()
+        aclMap.foreach { case (resource, acls) =>
+          acls.foreach { acl =>
+            val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
+                new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
+            if (filter.matches(fixture))
+              returnedAcls.add(fixture)
           }
         }
-        def createResponse(throttleTimeMs: Int): AbstractResponse =
-          new DescribeAclsResponse(throttleTimeMs, null, returnedAcls)
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new DescribeAclsResponse(requestThrottleMs, null, returnedAcls))
     }
   }
 
@@ -1780,14 +1773,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     val createAclsRequest = request.body[CreateAclsRequest]
     authorizer match {
       case None =>
-        def createResponse(throttleTimeMs: Int): AbstractResponse =
-          createAclsRequest.getErrorResponse(throttleTimeMs,
-            new SecurityDisabledException("No Authorizer is configured on the broker."))
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          createAclsRequest.getErrorResponse(requestThrottleMs,
+            new SecurityDisabledException("No Authorizer is configured on the broker.")))
       case Some(auth) =>
         val errors = mutable.HashMap[Int, Throwable]()
-        var creations = ListBuffer[(Resource, Acl)]()
-        for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
+        val creations = ListBuffer[(Resource, Acl)]()
+        for (i <- 0 until createAclsRequest.aclCreations.size) {
           val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter)
           result match {
             case Failure(throwable) => errors.put(i, throwable)
@@ -1796,7 +1788,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                     !resource.name.equals(Resource.ClusterResourceName))
                   throw new InvalidRequestException("The only valid name for the CLUSTER resource is " +
                       Resource.ClusterResourceName)
-                if (resource.name.isEmpty())
+                if (resource.name.isEmpty)
                   throw new InvalidRequestException("Invalid empty resource name")
                 auth.addAcls(immutable.Set(acl), resource)
               } catch {
@@ -1804,16 +1796,15 @@ class KafkaApis(val requestChannel: RequestChannel,
               }
           }
         }
-        var aclCreationResults = new java.util.ArrayList[AclCreationResponse]
+        val aclCreationResults = new java.util.ArrayList[AclCreationResponse]
         for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
           errors.get(i) match {
             case Some(throwable) => aclCreationResults.add(new AclCreationResponse(throwable))
             case None => aclCreationResults.add(new AclCreationResponse(null))
           }
         }
-        def createResponse(throttleTimeMs: Int): AbstractResponse =
-          new CreateAclsResponse(throttleTimeMs, aclCreationResults)
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          new CreateAclsResponse(requestThrottleMs, aclCreationResults))
     }
   }
 
@@ -1822,33 +1813,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     val deleteAclsRequest = request.body[DeleteAclsRequest]
     authorizer match {
       case None =>
-        def createResponse(throttleTimeMs: Int): AbstractResponse =
-          deleteAclsRequest.getErrorResponse(throttleTimeMs,
-            new SecurityDisabledException("No Authorizer is configured on the broker."))
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, requestThrottleMs =>
+          deleteAclsRequest.getErrorResponse(requestThrottleMs,
+            new SecurityDisabledException("No Authorizer is configured on the broker.")))
       case Some(auth) =>
         val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]()
-        var toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]()
+        val toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]()
         for (i <- 0 to deleteAclsRequest.filters().size - 1) {
           toDelete.put(i, new ListBuffer[(Resource, Acl)]())
         }
         if (deleteAclsRequest.filters().asScala.exists { f => !f.matchesAtMostOne() }) {
           // Delete based on filters that may match more than one ACL.
           val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
-          aclMap.foreach {
-            case (resource, acls) => {
-              acls.foreach {
-                case (acl) => {
-                  val binding = new AclBinding(new AdminResource(AdminResourceType.
-                      fromString(resource.resourceType.toString), resource.name),
-                    new AccessControlEntry(acl.principal.toString(), acl.host.toString(),
-                      acl.operation.toJava, acl.permissionType.toJava))
-                  for (i <- 0 to deleteAclsRequest.filters().size - 1) {
-                    val filter = deleteAclsRequest.filters().get(i)
-                    if (filter.matches(binding)) {
-                      toDelete.get(i).get += ((resource, acl))
-                    }
-                  }
+          aclMap.foreach { case (resource, acls) =>
+            acls.foreach { acl =>
+              val binding = new AclBinding(new AdminResource(AdminResourceType.
+                fromString(resource.resourceType.toString), resource.name),
+                new AccessControlEntry(acl.principal.toString(), acl.host.toString(),
+                  acl.operation.toJava, acl.permissionType.toJava))
+              for (i <- 0 to deleteAclsRequest.filters().size - 1) {
+                val filter = deleteAclsRequest.filters().get(i)
+                if (filter.matches(binding)) {
+                  toDelete.get(i).get += ((resource, acl))
                 }
               }
             }
@@ -1885,9 +1871,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           filterResponses.add(filterResponseMap.getOrElse(i,
             new AclFilterResponse(null, new util.ArrayList[AclDeletionResult]())))
         }
-        def createResponse(throttleTimeMs: Int): AbstractResponse =
-          new DeleteAclsResponse(throttleTimeMs, filterResponses)
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, requestThrottleMs => new DeleteAclsResponse(requestThrottleMs, filterResponses))
     }
   }
 
@@ -1899,13 +1883,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseBody = new OffsetsForLeaderEpochResponse(
       replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
     )
-    sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
+    sendResponseExemptThrottle(RequestChannel.Response(request, responseBody))
   }
 
   private def handleError(request: RequestChannel.Request, e: Throwable) {
     val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction
     if (request.requestObj != null) {
-      def sendResponseCallback(throttleTimeMs: Int) {
+      def sendResponseCallback(requestThrottleMs: Int) {
         request.requestObj.handleError(e, requestChannel, request)
         error("Error when handling request %s".format(request.requestObj), e)
       }
@@ -1920,20 +1904,21 @@ class KafkaApis(val requestChannel: RequestChannel,
       } else
         sendResponseExemptThrottle(request, () => sendResponseCallback(0))
     } else {
-      def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val response = request.body[AbstractRequest].getErrorResponse(throttleTimeMs, e)
-
+      def createResponse(requestThrottleMs: Int): RequestChannel.Response = {
+        val response = request.body[AbstractRequest].getErrorResponse(requestThrottleMs, e)
         /* If request doesn't have a default error response, we just close the connection.
            For example, when produce request has acks set to 0 */
         if (response == null)
-          requestChannel.closeConnection(request.processor, request)
-        response
+          new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction)
+        else RequestChannel.Response(request, response)
       }
       error("Error when handling request %s".format(request.body[AbstractRequest]), e)
       if (mayThrottle)
-        sendResponseMaybeThrottle(request, createResponse)
+        sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs =>
+          requestChannel.sendResponse(createResponse(requestThrottleMs))
+        })
       else
-        sendResponseExemptThrottle(request, new RequestChannel.Response(request, createResponse(0)))
+        sendResponseExemptThrottle(createResponse(0))
     }
   }
 
@@ -1954,7 +1939,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val unauthorizedResult = unauthorizedResources.keys.map { resource =>
       resource -> configsAuthorizationApiError(request.session, resource)
     }
-    sendResponseMaybeThrottle(request, new AlterConfigsResponse(_, (authorizedResult ++ unauthorizedResult).asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs =>
+      new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
   }
 
   private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = {
@@ -1992,7 +1978,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       resource -> new DescribeConfigsResponse.Config(error, Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
     }
 
-    sendResponseMaybeThrottle(request, new DescribeConfigsResponse(_, (authorizedConfigs ++ unauthorizedConfigs).asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs =>
+      new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ unauthorizedConfigs).asJava))
   }
 
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
@@ -2001,12 +1988,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) {
-    def sendResponseCallback(throttleTimeMs: Int) {
-      val response = createResponse(throttleTimeMs)
-      if (response != null)
-        sendResponse(request, response)
-    }
-    sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback)
+    sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs =>
+      sendResponse(request, createResponse(requestThrottleMs))
+    })
   }
 
   private def sendResponseMaybeThrottle(request: RequestChannel.Request, clientId: String, sendResponseCallback: Int => Unit) {
@@ -2027,8 +2011,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         sendResponseCallback)
   }
 
-  private def sendResponseExemptThrottle(request: RequestChannel.Request, response: Response) {
-    sendResponseExemptThrottle(request, () => requestChannel.sendResponse(response))
+  private def sendResponseExemptThrottle(response: RequestChannel.Response) {
+    sendResponseExemptThrottle(response.request, () => requestChannel.sendResponse(response))
   }
 
   private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) {
@@ -2042,7 +2026,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) {
-    requestChannel.sendResponse(new Response(request, response))
+    requestChannel.sendResponse(RequestChannel.Response(request, response))
   }
 
   private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond

http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7678550..acf96e8 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -30,7 +30,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName, NetworkSend}
+import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
@@ -98,7 +98,7 @@ class SocketServerTest extends JUnitSuite {
     byteBuffer.rewind()
 
     val send = new NetworkSend(request.connectionId, byteBuffer)
-    channel.sendResponse(new RequestChannel.Response(request, send))
+    channel.sendResponse(RequestChannel.Response(request, send))
   }
 
   def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
@@ -173,13 +173,13 @@ class SocketServerTest extends JUnitSuite {
     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
     val serializedBytes = producerRequestBytes
 
-    for (i <- 0 until 10)
+    for (_ <- 0 until 10)
       sendRequest(plainSocket, serializedBytes)
     plainSocket.close()
-    for (i <- 0 until 10) {
+    for (_ <- 0 until 10) {
       val request = server.requestChannel.receiveRequest(2000)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.noOperation(request.processor, request)
+      server.requestChannel.sendResponse(RequestChannel.Response(request, None, RequestChannel.NoOpAction))
     }
   }
 
@@ -331,9 +331,9 @@ class SocketServerTest extends JUnitSuite {
                                 protocol: SecurityProtocol): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) {
-          override protected[network] def sendResponse(response: RequestChannel.Response) {
+          override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
             conn.close()
-            super.sendResponse(response)
+            super.sendResponse(response, responseSend)
           }
         }
       }
@@ -357,7 +357,7 @@ class SocketServerTest extends JUnitSuite {
       // detected. If the buffer is larger than 102400 bytes, a second write is attempted and it fails with an
       // IOException.
       val send = new NetworkSend(request.connectionId, ByteBuffer.allocate(550000))
-      channel.sendResponse(new RequestChannel.Response(request, send))
+      channel.sendResponse(RequestChannel.Response(request, send))
       TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount,
         s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")