You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/21 02:44:53 UTC
git commit: kafka-1235;
Enable server to indefinitely retry on controlled shutdown;
patched by Guozhang Wang; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 90bea0b12 -> 6ab9b1ecd
kafka-1235; Enable server to indefinitely retry on controlled shutdown; patched by Guozhang Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ab9b1ec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ab9b1ec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ab9b1ec
Branch: refs/heads/trunk
Commit: 6ab9b1ecd89c74a42233fc792684e7a8757e9460
Parents: 90bea0b
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Feb 20 17:44:41 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 20 17:44:41 2014 -0800
----------------------------------------------------------------------
.../scala/kafka/consumer/SimpleConsumer.scala | 8 +++-----
.../scala/kafka/network/BlockingChannel.scala | 18 +++++++++++-------
.../main/scala/kafka/producer/SyncProducer.scala | 6 ++----
.../src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../scala/kafka/server/KafkaHealthcheck.scala | 8 +++++++-
.../src/main/scala/kafka/server/KafkaServer.scala | 6 ++++--
core/src/main/scala/kafka/utils/ZkUtils.scala | 6 ++++++
.../scala/unit/kafka/server/LogRecoveryTest.scala | 2 +-
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 4 ++--
9 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 6dae149..098d6e4 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -46,10 +46,8 @@ class SimpleConsumer(val host: String,
}
private def disconnect() = {
- if(blockingChannel.isConnected) {
- debug("Disconnecting from " + host + ":" + port)
- blockingChannel.disconnect()
- }
+ debug("Disconnecting from " + host + ":" + port)
+ blockingChannel.disconnect()
}
private def reconnect() {
@@ -66,9 +64,9 @@ class SimpleConsumer(val host: String,
private def sendRequest(request: RequestOrResponse): Receive = {
lock synchronized {
- getOrMakeConnection()
var response: Receive = null
try {
+ getOrMakeConnection()
blockingChannel.send(request)
response = blockingChannel.receive()
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index ab04b3f..b894fa6 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -71,19 +71,23 @@ class BlockingChannel( val host: String,
}
def disconnect() = lock synchronized {
- if(connected || channel != null) {
- // closing the main socket channel *should* close the read channel
- // but let's do it to be sure.
+ if(channel != null) {
swallow(channel.close())
swallow(channel.socket.close())
- if(readChannel != null) swallow(readChannel.close())
- channel = null; readChannel = null; writeChannel = null
- connected = false
+ channel = null
+ writeChannel = null
}
+ // closing the main socket channel *should* close the read channel
+ // but let's do it to be sure.
+ if(readChannel != null) {
+ swallow(readChannel.close())
+ readChannel = null
+ }
+ connected = false
}
def isConnected = connected
-
+
def send(request: RequestOrResponse):Int = {
if(!connected)
throw new ClosedChannelException()
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 041cfa5..489f007 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -126,10 +126,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
*/
private def disconnect() {
try {
- if(blockingChannel.isConnected) {
- info("Disconnecting from " + config.host + ":" + config.port)
- blockingChannel.disconnect()
- }
+ info("Disconnecting from " + config.host + ":" + config.port)
+ blockingChannel.disconnect()
} catch {
case e: Exception => error("Error on disconnect: ", e)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3c3aafc..04a5d39 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,7 +33,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
}
private def getLogRetentionTimeMillis(): Long = {
- var millisInMinute = 60L * 1000L
+ val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
if(props.containsKey("log.retention.minutes")){
millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 8c69d09..4acdd70 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -38,12 +38,18 @@ class KafkaHealthcheck(private val brokerId: Int,
private val zkClient: ZkClient) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
+ val sessionExpireListener = new SessionExpireListener
def startup() {
- zkClient.subscribeStateChanges(new SessionExpireListener)
+ zkClient.subscribeStateChanges(sessionExpireListener)
register()
}
+ def shutdown() {
+ zkClient.unsubscribeStateChanges(sessionExpireListener)
+ ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
+ }
+
/**
* Register this broker as "alive" in zookeeper
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5e34f95..c606b50 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -134,9 +134,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
// the shutdown.
var remainingRetries = config.controlledShutdownMaxRetries
info("Starting controlled shutdown")
- var channel : BlockingChannel = null;
+ var channel : BlockingChannel = null
var prevController : Broker = null
- var shutdownSuceeded : Boolean =false
+ var shutdownSuceeded : Boolean = false
try {
while (!shutdownSuceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
@@ -218,6 +218,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
Utils.swallow(controlledShutdown())
+ if(kafkaHealthcheck != null)
+ Utils.swallow(kafkaHealthcheck.shutdown())
if(socketServer != null)
Utils.swallow(socketServer.shutdown())
if(requestHandlerPool != null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f8e798b..a198628 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -210,6 +210,12 @@ object ZkUtils extends Logging {
info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
}
+ def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
+ val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
+ deletePath(zkClient, brokerIdPath)
+ info("Deregistered broker %d at path %s.".format(id, brokerIdPath))
+ }
+
def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
val topicDirs = new ZKGroupTopicDirs(group, topic)
topicDirs.consumerOwnerDir + "/" + partition
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 17a99f1..db0e58b 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -56,11 +56,11 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producerProps.put("request.required.acks", "-1")
override def tearDown() {
- super.tearDown()
for(server <- servers) {
server.shutdown()
Utils.rm(server.config.logDirs(0))
}
+ super.tearDown()
}
def testHWCheckpointNoFailuresSingleLogSegment {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6ab9b1ec/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 4e25b92..67d9c4b 100644
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -29,15 +29,15 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
val zkSessionTimeout = 6000
override def setUp() {
+ super.setUp
zookeeper = new EmbeddedZookeeper(zkConnect)
zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
- super.setUp
}
override def tearDown() {
- super.tearDown
Utils.swallow(zkClient.close())
Utils.swallow(zookeeper.shutdown())
+ super.tearDown
}
}