You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/08 18:16:47 UTC
[kafka] branch 1.1 updated: MINOR: Move processor response queue
into Processor (#4542)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new cc8bfe1 MINOR: Move processor response queue into Processor (#4542)
cc8bfe1 is described below
commit cc8bfe1f802ebf5294e8c92138abda0d65625b08
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Feb 8 10:14:07 2018 -0800
MINOR: Move processor response queue into Processor (#4542)
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../main/scala/kafka/network/RequestChannel.scala | 55 ++++++----------------
.../main/scala/kafka/network/SocketServer.scala | 53 ++++++++++++++-------
2 files changed, 51 insertions(+), 57 deletions(-)
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 6ec42a6..144632c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -242,36 +242,26 @@ object RequestChannel extends Logging {
class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
val metrics = new RequestChannel.Metrics
- private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
- private val responseQueues = new ConcurrentHashMap[Int, BlockingQueue[RequestChannel.Response]]()
+ private val processors = new ConcurrentHashMap[Int, Processor]()
- newGauge(
- "RequestQueueSize",
- new Gauge[Int] {
+ newGauge("RequestQueueSize", new Gauge[Int] {
def value = requestQueue.size
- }
- )
+ })
newGauge("ResponseQueueSize", new Gauge[Int]{
- def value = responseQueues.values.asScala.foldLeft(0) {(total, q) => total + q.size()}
+ def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
+ total + processor.responseQueueSize
+ }
})
- def addProcessor(processorId: Int): Unit = {
- val responseQueue = new LinkedBlockingQueue[RequestChannel.Response]()
- if (responseQueues.putIfAbsent(processorId, responseQueue) != null)
- warn(s"Unexpected processor with processorId $processorId")
- newGauge("ResponseQueueSize",
- new Gauge[Int] {
- def value = responseQueue.size()
- },
- Map("processor" -> processorId.toString)
- )
+ def addProcessor(processor: Processor): Unit = {
+ if (processors.putIfAbsent(processor.id, processor) != null)
+ warn(s"Unexpected processor with processorId ${processor.id}")
}
def removeProcessor(processorId: Int): Unit = {
- removeMetric("ResponseQueueSize", Map("processor" -> processorId.toString))
- responseQueues.remove(processorId)
+ processors.remove(processorId)
}
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
@@ -294,13 +284,11 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
trace(message)
}
- val responseQueue = responseQueues.get(response.processor)
- // `responseQueue` may be null if the processor was shutdown. In this case, the connections
+ val processor = processors.get(response.processor)
+ // The processor may be null if it was shutdown. In this case, the connections
// are closed, so the response is dropped.
- if (responseQueue != null) {
- responseQueue.put(response)
- for (onResponse <- responseListeners)
- onResponse(response.processor)
+ if (processor != null) {
+ processor.enqueueResponse(response)
}
}
@@ -312,21 +300,6 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
def receiveRequest(): RequestChannel.BaseRequest =
requestQueue.take()
- /** Get a response for the given processor if there is one */
- def receiveResponse(processor: Int): RequestChannel.Response = {
- val responseQueue = responseQueues.get(processor)
- if (responseQueue == null)
- throw new IllegalStateException(s"receiveResponse with invalid processor $processor: processors=${responseQueues.keySet}")
- val response = responseQueue.poll()
- if (response != null)
- response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
- response
- }
-
- def addResponseListener(onResponse: Int => Unit) {
- responseListeners ::= onResponse
- }
-
def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
errors.foreach { case (error, count) =>
metrics(apiKey.name).markErrorMeter(error, count)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index c11ebcb..fef412b 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -119,15 +119,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
- val numProcessorThreads = config.numNetworkThreads
endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
for (i <- 0 until newProcessorsPerListener) {
- listenerProcessors += newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
- requestChannel.addProcessor(nextProcessorId)
+ val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
+ listenerProcessors += processor
+ requestChannel.addProcessor(processor)
nextProcessorId += 1
}
listenerProcessors.foreach(p => processors.put(p.id, p))
@@ -140,21 +140,14 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
}
}
- // register the processor threads for notification of responses
- requestChannel.addResponseListener(id => {
- val processor = processors.get(id)
- if (processor != null)
- processor.wakeup()
- })
-
/**
* Stop processing requests and new connections.
*/
def stopProcessingRequests() = {
info("Stopping socket server request processors")
this.synchronized {
- acceptors.asScala.values.foreach(_.shutdown)
- processors.asScala.values.foreach(_.shutdown)
+ acceptors.asScala.values.foreach(_.shutdown())
+ processors.asScala.values.foreach(_.shutdown())
requestChannel.clear()
stoppedProcessingRequests = true
}
@@ -475,11 +468,20 @@ private[kafka] class Processor(val id: Int,
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
+ private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
+
private[kafka] val metricTags = mutable.LinkedHashMap(
"listener" -> listenerName.value,
"networkProcessor" -> id.toString
).asJava
+ newGauge("ResponseQueueSize",
+ new Gauge[Int] {
+ def value = responseQueue.size()
+ },
+ Map("processor" -> id.toString)
+ )
+
newGauge("IdlePercent",
new Gauge[Double] {
def value = {
@@ -562,7 +564,7 @@ private[kafka] class Processor(val id: Int,
private def processChannelException(channelId: String, errorMessage: String, throwable: Throwable) {
if (openOrClosingChannel(channelId).isDefined) {
- error(s"Closing socket for ${channelId} because of error", throwable)
+ error(s"Closing socket for $channelId because of error", throwable)
close(channelId)
}
processException(errorMessage, throwable)
@@ -570,7 +572,7 @@ private[kafka] class Processor(val id: Int,
private def processNewResponses() {
var curr: RequestChannel.Response = null
- while ({curr = requestChannel.receiveResponse(id); curr != null}) {
+ while ({curr = dequeueResponse(); curr != null}) {
val channelId = curr.request.context.connectionId
try {
curr.responseAction match {
@@ -753,6 +755,20 @@ private[kafka] class Processor(val id: Int,
connId
}
+ private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
+ responseQueue.put(response)
+ wakeup()
+ }
+
+ private def dequeueResponse(): RequestChannel.Response = {
+ val response = responseQueue.poll()
+ if (response != null)
+ response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
+ response
+ }
+
+ private[network] def responseQueueSize = responseQueue.size
+
// Only for testing
private[network] def inflightResponseCount: Int = inflightResponses.size
@@ -772,8 +788,13 @@ private[kafka] class Processor(val id: Int,
/**
* Wakeup the thread for selection.
*/
- @Override
- def wakeup = selector.wakeup()
+ override def wakeup() = selector.wakeup()
+
+ override def shutdown(): Unit = {
+ super.shutdown()
+ removeMetric("ResponseQueueSize", Map("processor" -> id.toString))
+ removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
+ }
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.