You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/15 09:54:58 UTC
kafka git commit: MINOR: Handle error metrics removal during shutdown
Repository: kafka
Updated Branches:
refs/heads/trunk d04daf570 -> 3cfbb25c6
MINOR: Handle error metrics removal during shutdown
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #4187 from rajinisivaram/MINOR-metrics-cleanup
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cfbb25c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cfbb25c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cfbb25c
Branch: refs/heads/trunk
Commit: 3cfbb25c616bee44ac181e9320d4fd6d79ab9c58
Parents: d04daf5
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Nov 15 09:54:42 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Wed Nov 15 09:54:42 2017 +0000
----------------------------------------------------------------------
.../scala/kafka/network/RequestChannel.scala | 7 +++++-
.../main/scala/kafka/network/SocketServer.scala | 24 ++++++++++++++++----
.../kafka/server/KafkaRequestHandler.scala | 1 -
.../main/scala/kafka/server/KafkaServer.scala | 9 +++++++-
.../unit/kafka/network/SocketServerTest.scala | 7 +++---
5 files changed, 37 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index a4ec5e3..a50af45 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -315,10 +315,15 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
}
}
- def shutdown() {
+ def clear() {
requestQueue.clear()
}
+ def shutdown() {
+ clear()
+ metrics.close()
+ }
+
def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index bea8f79..4366fea 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -72,6 +72,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _
+ private var stoppedProcessingRequests = false
/**
* Start the socket server
@@ -132,13 +133,28 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
requestChannel.addResponseListener(id => processors(id).wakeup())
/**
- * Shutdown the socket server
- */
- def shutdown() = {
- info("Shutting down")
+ * Stop processing requests and new connections.
+ */
+ def stopProcessingRequests() = {
+ info("Stopping socket server request processors")
this.synchronized {
acceptors.values.foreach(_.shutdown)
processors.foreach(_.shutdown)
+ requestChannel.clear()
+ stoppedProcessingRequests = true
+ }
+ info("Stopped socket server request processors")
+ }
+
+ /**
+ * Shutdown the socket server. If still processing requests, shutdown
+ * acceptors and processors first.
+ */
+ def shutdown() = {
+ info("Shutting down socket server")
+ this.synchronized {
+ if (!stoppedProcessingRequests)
+ stopProcessingRequests()
requestChannel.shutdown()
}
info("Shutdown completed")
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 3d8dbd9..a498781 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,7 +105,6 @@ class KafkaRequestHandlerPool(val brokerId: Int,
handler.initiateShutdown()
for (handler <- runnables)
handler.awaitShutdown()
- requestChannel.metrics.close()
info("shut down completely")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a0732fd..a13f5af 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -526,8 +526,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown())
+ // Stop socket server to stop accepting any more connections and requests.
+ // Socket server will be shutdown towards the end of the sequence.
if (socketServer != null)
- CoreUtils.swallow(socketServer.shutdown())
+ CoreUtils.swallow(socketServer.stopProcessingRequests())
if (requestHandlerPool != null)
CoreUtils.swallow(requestHandlerPool.shutdown())
@@ -558,6 +560,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown())
+ // Even though socket server is stopped much earlier, controller can generate
+ // response for controlled shutdown request. Shutdown server at the end to
+ // avoid any failures (e.g. when metrics are recorded)
+ if (socketServer != null)
+ CoreUtils.swallow(socketServer.shutdown())
if (metrics != null)
CoreUtils.swallow(metrics.close())
if (brokerTopicStats != null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cfbb25c/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 0c05c46..024e7f9 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -136,7 +136,6 @@ class SocketServerTest extends JUnitSuite {
def shutdownServerAndMetrics(server: SocketServer): Unit = {
server.shutdown()
server.metrics.close()
- server.requestChannel.metrics.close()
}
@After
@@ -576,8 +575,8 @@ class SocketServerTest extends JUnitSuite {
}
@Test
- def testRequestMetricsAfterShutdown(): Unit = {
- server.shutdown()
+ def testRequestMetricsAfterStop(): Unit = {
+ server.stopProcessingRequests()
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark()
server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
@@ -591,7 +590,7 @@ class SocketServerTest extends JUnitSuite {
.collect { case (k, metric: Meter) => (k.toString, metric.count) }
assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) => value != 0 })
- server.requestChannel.metrics.close()
+ server.shutdown()
assertEquals(Map.empty, requestMetricMeters)
}