You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/08 18:16:47 UTC

[kafka] branch 1.1 updated: MINOR: Move processor response queue into Processor (#4542)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new cc8bfe1  MINOR: Move processor response queue into Processor (#4542)
cc8bfe1 is described below

commit cc8bfe1f802ebf5294e8c92138abda0d65625b08
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Feb 8 10:14:07 2018 -0800

    MINOR: Move processor response queue into Processor (#4542)
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../main/scala/kafka/network/RequestChannel.scala  | 55 ++++++----------------
 .../main/scala/kafka/network/SocketServer.scala    | 53 ++++++++++++++-------
 2 files changed, 51 insertions(+), 57 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 6ec42a6..144632c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -242,36 +242,26 @@ object RequestChannel extends Logging {
 
 class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
   val metrics = new RequestChannel.Metrics
-  private var responseListeners: List[(Int) => Unit] = Nil
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
-  private val responseQueues = new ConcurrentHashMap[Int, BlockingQueue[RequestChannel.Response]]()
+  private val processors = new ConcurrentHashMap[Int, Processor]()
 
-  newGauge(
-    "RequestQueueSize",
-    new Gauge[Int] {
+  newGauge("RequestQueueSize", new Gauge[Int] {
       def value = requestQueue.size
-    }
-  )
+  })
 
   newGauge("ResponseQueueSize", new Gauge[Int]{
-    def value = responseQueues.values.asScala.foldLeft(0) {(total, q) => total + q.size()}
+    def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
+      total + processor.responseQueueSize
+    }
   })
 
-  def addProcessor(processorId: Int): Unit = {
-    val responseQueue = new LinkedBlockingQueue[RequestChannel.Response]()
-    if (responseQueues.putIfAbsent(processorId, responseQueue) != null)
-      warn(s"Unexpected processor with processorId $processorId")
-    newGauge("ResponseQueueSize",
-      new Gauge[Int] {
-        def value = responseQueue.size()
-      },
-      Map("processor" -> processorId.toString)
-    )
+  def addProcessor(processor: Processor): Unit = {
+    if (processors.putIfAbsent(processor.id, processor) != null)
+      warn(s"Unexpected processor with processorId ${processor.id}")
   }
 
   def removeProcessor(processorId: Int): Unit = {
-    removeMetric("ResponseQueueSize", Map("processor" -> processorId.toString))
-    responseQueues.remove(processorId)
+    processors.remove(processorId)
   }
 
   /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
@@ -294,13 +284,11 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
       trace(message)
     }
 
-    val responseQueue = responseQueues.get(response.processor)
-    // `responseQueue` may be null if the processor was shutdown. In this case, the connections
+    val processor = processors.get(response.processor)
+    // The processor may be null if it was shutdown. In this case, the connections
     // are closed, so the response is dropped.
-    if (responseQueue != null) {
-      responseQueue.put(response)
-      for (onResponse <- responseListeners)
-        onResponse(response.processor)
+    if (processor != null) {
+      processor.enqueueResponse(response)
     }
   }
 
@@ -312,21 +300,6 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
   def receiveRequest(): RequestChannel.BaseRequest =
     requestQueue.take()
 
-  /** Get a response for the given processor if there is one */
-  def receiveResponse(processor: Int): RequestChannel.Response = {
-    val responseQueue = responseQueues.get(processor)
-    if (responseQueue == null)
-      throw new IllegalStateException(s"receiveResponse with invalid processor $processor: processors=${responseQueues.keySet}")
-    val response = responseQueue.poll()
-    if (response != null)
-      response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
-    response
-  }
-
-  def addResponseListener(onResponse: Int => Unit) {
-    responseListeners ::= onResponse
-  }
-
   def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
     errors.foreach { case (error, count) =>
       metrics(apiKey.name).markErrorMeter(error, count)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index c11ebcb..fef412b 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -119,15 +119,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     val recvBufferSize = config.socketReceiveBufferBytes
     val brokerId = config.brokerId
 
-    val numProcessorThreads = config.numNetworkThreads
     endpoints.foreach { endpoint =>
       val listenerName = endpoint.listenerName
       val securityProtocol = endpoint.securityProtocol
       val listenerProcessors = new ArrayBuffer[Processor]()
 
       for (i <- 0 until newProcessorsPerListener) {
-        listenerProcessors += newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
-        requestChannel.addProcessor(nextProcessorId)
+        val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
+        listenerProcessors += processor
+        requestChannel.addProcessor(processor)
         nextProcessorId += 1
       }
       listenerProcessors.foreach(p => processors.put(p.id, p))
@@ -140,21 +140,14 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     }
   }
 
-  // register the processor threads for notification of responses
-  requestChannel.addResponseListener(id => {
-    val processor = processors.get(id)
-    if (processor != null)
-      processor.wakeup()
-  })
-
   /**
     * Stop processing requests and new connections.
     */
   def stopProcessingRequests() = {
     info("Stopping socket server request processors")
     this.synchronized {
-      acceptors.asScala.values.foreach(_.shutdown)
-      processors.asScala.values.foreach(_.shutdown)
+      acceptors.asScala.values.foreach(_.shutdown())
+      processors.asScala.values.foreach(_.shutdown())
       requestChannel.clear()
       stoppedProcessingRequests = true
     }
@@ -475,11 +468,20 @@ private[kafka] class Processor(val id: Int,
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
+  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
+
   private[kafka] val metricTags = mutable.LinkedHashMap(
     "listener" -> listenerName.value,
     "networkProcessor" -> id.toString
   ).asJava
 
+  newGauge("ResponseQueueSize",
+    new Gauge[Int] {
+      def value = responseQueue.size()
+    },
+    Map("processor" -> id.toString)
+  )
+
   newGauge("IdlePercent",
     new Gauge[Double] {
       def value = {
@@ -562,7 +564,7 @@ private[kafka] class Processor(val id: Int,
 
   private def processChannelException(channelId: String, errorMessage: String, throwable: Throwable) {
     if (openOrClosingChannel(channelId).isDefined) {
-      error(s"Closing socket for ${channelId} because of error", throwable)
+      error(s"Closing socket for $channelId because of error", throwable)
       close(channelId)
     }
     processException(errorMessage, throwable)
@@ -570,7 +572,7 @@ private[kafka] class Processor(val id: Int,
 
   private def processNewResponses() {
     var curr: RequestChannel.Response = null
-    while ({curr = requestChannel.receiveResponse(id); curr != null}) {
+    while ({curr = dequeueResponse(); curr != null}) {
       val channelId = curr.request.context.connectionId
       try {
         curr.responseAction match {
@@ -753,6 +755,20 @@ private[kafka] class Processor(val id: Int,
     connId
   }
 
+  private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
+    responseQueue.put(response)
+    wakeup()
+  }
+
+  private def dequeueResponse(): RequestChannel.Response = {
+    val response = responseQueue.poll()
+    if (response != null)
+      response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
+    response
+  }
+
+  private[network] def responseQueueSize = responseQueue.size
+
   // Only for testing
   private[network] def inflightResponseCount: Int = inflightResponses.size
 
@@ -772,8 +788,13 @@ private[kafka] class Processor(val id: Int,
   /**
    * Wakeup the thread for selection.
    */
-  @Override
-  def wakeup = selector.wakeup()
+  override def wakeup() = selector.wakeup()
+
+  override def shutdown(): Unit = {
+    super.shutdown()
+    removeMetric("ResponseQueueSize", Map("processor" -> id.toString))
+    removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
+  }
 
 }
 

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.