You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/15 09:54:58 UTC

kafka git commit: MINOR: Handle error metrics removal during shutdown

Repository: kafka
Updated Branches:
  refs/heads/trunk d04daf570 -> 3cfbb25c6


MINOR: Handle error metrics removal during shutdown

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #4187 from rajinisivaram/MINOR-metrics-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cfbb25c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cfbb25c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cfbb25c

Branch: refs/heads/trunk
Commit: 3cfbb25c616bee44ac181e9320d4fd6d79ab9c58
Parents: d04daf5
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Nov 15 09:54:42 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Wed Nov 15 09:54:42 2017 +0000

----------------------------------------------------------------------
 .../scala/kafka/network/RequestChannel.scala    |  7 +++++-
 .../main/scala/kafka/network/SocketServer.scala | 24 ++++++++++++++++----
 .../kafka/server/KafkaRequestHandler.scala      |  1 -
 .../main/scala/kafka/server/KafkaServer.scala   |  9 +++++++-
 .../unit/kafka/network/SocketServerTest.scala   |  7 +++---
 5 files changed, 37 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index a4ec5e3..a50af45 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -315,10 +315,15 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
     }
   }
 
-  def shutdown() {
+  def clear() {
     requestQueue.clear()
   }
 
+  def shutdown() {
+    clear()
+    metrics.close()
+  }
+
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index bea8f79..4366fea 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -72,6 +72,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
   private var connectionQuotas: ConnectionQuotas = _
+  private var stoppedProcessingRequests = false
 
   /**
    * Start the socket server
@@ -132,13 +133,28 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   requestChannel.addResponseListener(id => processors(id).wakeup())
 
   /**
-   * Shutdown the socket server
-   */
-  def shutdown() = {
-    info("Shutting down")
+    * Stop processing requests and new connections.
+    */
+  def stopProcessingRequests() = {
+    info("Stopping socket server request processors")
     this.synchronized {
       acceptors.values.foreach(_.shutdown)
       processors.foreach(_.shutdown)
+      requestChannel.clear()
+      stoppedProcessingRequests = true
+    }
+    info("Stopped socket server request processors")
+  }
+
+  /**
+    * Shutdown the socket server. If still processing requests, shutdown
+    * acceptors and processors first.
+    */
+  def shutdown() = {
+    info("Shutting down socket server")
+    this.synchronized {
+      if (!stoppedProcessingRequests)
+        stopProcessingRequests()
       requestChannel.shutdown()
     }
     info("Shutdown completed")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 3d8dbd9..a498781 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,7 +105,6 @@ class KafkaRequestHandlerPool(val brokerId: Int,
       handler.initiateShutdown()
     for (handler <- runnables)
       handler.awaitShutdown()
-    requestChannel.metrics.close()
     info("shut down completely")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a0732fd..a13f5af 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -526,8 +526,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         if (dynamicConfigManager != null)
           CoreUtils.swallow(dynamicConfigManager.shutdown())
 
+        // Stop socket server to stop accepting any more connections and requests.
+        // Socket server will be shutdown towards the end of the sequence.
         if (socketServer != null)
-          CoreUtils.swallow(socketServer.shutdown())
+          CoreUtils.swallow(socketServer.stopProcessingRequests())
         if (requestHandlerPool != null)
           CoreUtils.swallow(requestHandlerPool.shutdown())
 
@@ -558,6 +560,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         if (quotaManagers != null)
           CoreUtils.swallow(quotaManagers.shutdown())
+        // Even though socket server is stopped much earlier, controller can generate
+        // response for controlled shutdown request. Shutdown server at the end to
+        // avoid any failures (e.g. when metrics are recorded)
+        if (socketServer != null)
+          CoreUtils.swallow(socketServer.shutdown())
         if (metrics != null)
           CoreUtils.swallow(metrics.close())
         if (brokerTopicStats != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 0c05c46..024e7f9 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -136,7 +136,6 @@ class SocketServerTest extends JUnitSuite {
   def shutdownServerAndMetrics(server: SocketServer): Unit = {
     server.shutdown()
     server.metrics.close()
-    server.requestChannel.metrics.close()
   }
 
   @After
@@ -576,8 +575,8 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
-  def testRequestMetricsAfterShutdown(): Unit = {
-    server.shutdown()
+  def testRequestMetricsAfterStop(): Unit = {
+    server.stopProcessingRequests()
 
     server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark()
     server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
@@ -591,7 +590,7 @@ class SocketServerTest extends JUnitSuite {
       .collect { case (k, metric: Meter) => (k.toString, metric.count) }
 
     assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) => value != 0 })
-    server.requestChannel.metrics.close()
+    server.shutdown()
     assertEquals(Map.empty, requestMetricMeters)
   }