You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/21 02:44:53 UTC

git commit: kafka-1235; Enable server to indefinitely retry on controlled shutdown; patched by Guozhang Wang; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 90bea0b12 -> 6ab9b1ecd


kafka-1235; Enable server to indefinitely retry on controlled shutdown; patched by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ab9b1ec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ab9b1ec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ab9b1ec

Branch: refs/heads/trunk
Commit: 6ab9b1ecd89c74a42233fc792684e7a8757e9460
Parents: 90bea0b
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Feb 20 17:44:41 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 20 17:44:41 2014 -0800

----------------------------------------------------------------------
 .../scala/kafka/consumer/SimpleConsumer.scala     |  8 +++-----
 .../scala/kafka/network/BlockingChannel.scala     | 18 +++++++++++-------
 .../main/scala/kafka/producer/SyncProducer.scala  |  6 ++----
 .../src/main/scala/kafka/server/KafkaConfig.scala |  2 +-
 .../scala/kafka/server/KafkaHealthcheck.scala     |  8 +++++++-
 .../src/main/scala/kafka/server/KafkaServer.scala |  6 ++++--
 core/src/main/scala/kafka/utils/ZkUtils.scala     |  6 ++++++
 .../scala/unit/kafka/server/LogRecoveryTest.scala |  2 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala      |  4 ++--
 9 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 6dae149..098d6e4 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -46,10 +46,8 @@ class SimpleConsumer(val host: String,
   }
 
   private def disconnect() = {
-    if(blockingChannel.isConnected) {
-      debug("Disconnecting from " + host + ":" + port)
-      blockingChannel.disconnect()
-    }
+    debug("Disconnecting from " + host + ":" + port)
+    blockingChannel.disconnect()
   }
 
   private def reconnect() {
@@ -66,9 +64,9 @@ class SimpleConsumer(val host: String,
   
   private def sendRequest(request: RequestOrResponse): Receive = {
     lock synchronized {
-      getOrMakeConnection()
       var response: Receive = null
       try {
+        getOrMakeConnection()
         blockingChannel.send(request)
         response = blockingChannel.receive()
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index ab04b3f..b894fa6 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -71,19 +71,23 @@ class BlockingChannel( val host: String,
   }
   
   def disconnect() = lock synchronized {
-    if(connected || channel != null) {
-      // closing the main socket channel *should* close the read channel
-      // but let's do it to be sure.
+    if(channel != null) {
       swallow(channel.close())
       swallow(channel.socket.close())
-      if(readChannel != null) swallow(readChannel.close())
-      channel = null; readChannel = null; writeChannel = null
-      connected = false
+      channel = null
+      writeChannel = null
     }
+    // closing the main socket channel *should* close the read channel
+    // but let's do it to be sure.
+    if(readChannel != null) {
+      swallow(readChannel.close())
+      readChannel = null
+    }
+    connected = false
   }
 
   def isConnected = connected
-  
+
   def send(request: RequestOrResponse):Int = {
     if(!connected)
       throw new ClosedChannelException()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 041cfa5..489f007 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -126,10 +126,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    */
   private def disconnect() {
     try {
-      if(blockingChannel.isConnected) {
-        info("Disconnecting from " + config.host + ":" + config.port)
-        blockingChannel.disconnect()
-      }
+      info("Disconnecting from " + config.host + ":" + config.port)
+      blockingChannel.disconnect()
     } catch {
       case e: Exception => error("Error on disconnect: ", e)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3c3aafc..04a5d39 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,7 +33,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   }
   
   private def getLogRetentionTimeMillis(): Long = {
-    var millisInMinute = 60L * 1000L
+    val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
     if(props.containsKey("log.retention.minutes")){
        millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 8c69d09..4acdd70 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -38,12 +38,18 @@ class KafkaHealthcheck(private val brokerId: Int,
                        private val zkClient: ZkClient) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
+  val sessionExpireListener = new SessionExpireListener
   
   def startup() {
-    zkClient.subscribeStateChanges(new SessionExpireListener)
+    zkClient.subscribeStateChanges(sessionExpireListener)
     register()
   }
 
+  def shutdown() {
+    zkClient.unsubscribeStateChanges(sessionExpireListener)
+    ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
+  }
+
   /**
    * Register this broker as "alive" in zookeeper
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5e34f95..c606b50 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -134,9 +134,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       // the shutdown.
       var remainingRetries = config.controlledShutdownMaxRetries
       info("Starting controlled shutdown")
-      var channel : BlockingChannel = null;
+      var channel : BlockingChannel = null
       var prevController : Broker = null
-      var shutdownSuceeded : Boolean =false
+      var shutdownSuceeded : Boolean = false
       try {
         while (!shutdownSuceeded && remainingRetries > 0) {
           remainingRetries = remainingRetries - 1
@@ -218,6 +218,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       Utils.swallow(controlledShutdown())
+      if(kafkaHealthcheck != null)
+        Utils.swallow(kafkaHealthcheck.shutdown())
       if(socketServer != null)
         Utils.swallow(socketServer.shutdown())
       if(requestHandlerPool != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f8e798b..a198628 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -210,6 +210,12 @@ object ZkUtils extends Logging {
     info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
+  def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
+    val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
+    deletePath(zkClient, brokerIdPath)
+    info("Deregistered broker %d at path %s.".format(id, brokerIdPath))
+  }
+
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
     val topicDirs = new ZKGroupTopicDirs(group, topic)
     topicDirs.consumerOwnerDir + "/" + partition

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 17a99f1..db0e58b 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -56,11 +56,11 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   producerProps.put("request.required.acks", "-1")
   
   override def tearDown() {
-    super.tearDown()
     for(server <- servers) {
       server.shutdown()
       Utils.rm(server.config.logDirs(0))
     }
+    super.tearDown()
   }
 
   def testHWCheckpointNoFailuresSingleLogSegment {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 4e25b92..67d9c4b 100644
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -29,15 +29,15 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
   val zkSessionTimeout = 6000
 
   override def setUp() {
+    super.setUp
     zookeeper = new EmbeddedZookeeper(zkConnect)
     zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
-    super.setUp
   }
 
   override def tearDown() {
-    super.tearDown
     Utils.swallow(zkClient.close())
     Utils.swallow(zookeeper.shutdown())
+    super.tearDown
   }
 
 }