You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:03 UTC

[7/37] git commit: KAFKA-749 Bug in socket server shutdown logic makes the broker hang on shutdown until it has to be killed; reviewed by Sriram and Jay Kreps

KAFKA-749 Bug in socket server shutdown logic makes the broker hang on shutdown until it has to be killed; reviewed by Sriram and Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/826f02a7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/826f02a7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/826f02a7

Branch: refs/heads/trunk
Commit: 826f02a74e00bec1efb9d60b43cf9e0702d8d58a
Parents: aed6c3c
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Feb 5 20:11:03 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Feb 5 20:11:03 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/network/RequestChannel.scala  |    4 ++++
 .../main/scala/kafka/network/SocketServer.scala    |    1 +
 .../scala/kafka/server/KafkaRequestHandler.scala   |    2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    4 ++--
 4 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/826f02a7/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 9b0f7e9..7747ddd 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -126,6 +126,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
   def addResponseListener(onResponse: Int => Unit) { 
     responseListeners ::= onResponse
   }
+
+  def shutdown() {
+    requestQueue.clear
+  }
 }
 
 object RequestMetrics {

http://git-wip-us.apache.org/repos/asf/kafka/blob/826f02a7/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b056e25..8f0053a 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -71,6 +71,7 @@ class SocketServer(val brokerId: Int,
       acceptor.shutdown()
     for(processor <- processors)
       processor.shutdown()
+    requestChannel.shutdown
     info("shut down completely")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/826f02a7/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f0c05a5..69ca058 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -32,7 +32,7 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
     while(true) {
       try {
         val req = requestChannel.receiveRequest()
-        if(req eq RequestChannel.AllDone){
+        if(req eq RequestChannel.AllDone) {
           trace("receives shut down command, shut down".format(brokerId, id))
           return
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/826f02a7/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1fe1ca9..09e261f 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -110,6 +110,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     info("shutting down")
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
+      if(socketServer != null)
+        Utils.swallow(socketServer.shutdown())
       if(requestHandlerPool != null)
         Utils.swallow(requestHandlerPool.shutdown())
       Utils.swallow(kafkaScheduler.shutdown())
@@ -119,8 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         Utils.swallow(kafkaZookeeper.shutdown())
       if(replicaManager != null)
         Utils.swallow(replicaManager.shutdown())
-      if(socketServer != null)
-        Utils.swallow(socketServer.shutdown())
       if(logManager != null)
         Utils.swallow(logManager.shutdown())