You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/05/18 22:03:12 UTC
kafka git commit: kafka-2169; Upgrade to zkclient-0.5;
patched by Parth Brahmbhatt; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 49026f117 -> 41ba26273
kafka-2169; Upgrade to zkclient-0.5; patched by Parth Brahmbhatt; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41ba2627
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41ba2627
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41ba2627
Branch: refs/heads/trunk
Commit: 41ba26273b497e4cbcc947c742ff6831b7320152
Parents: 49026f1
Author: Parth Brahmbhatt <pb...@hortonworks.com>
Authored: Mon May 18 13:02:47 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon May 18 13:02:47 2015 -0700
----------------------------------------------------------------------
build.gradle | 2 +-
.../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 3 +++
.../main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala | 4 ++++
core/src/main/scala/kafka/controller/KafkaController.scala | 4 ++++
core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 5 +++++
5 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index fef515b..cd2aa83 100644
--- a/build.gradle
+++ b/build.gradle
@@ -207,7 +207,7 @@ project(':core') {
compile project(':clients')
compile "org.scala-lang:scala-library:$scalaVersion"
compile 'org.apache.zookeeper:zookeeper:3.4.6'
- compile 'com.101tec:zkclient:0.3'
+ compile 'com.101tec:zkclient:0.5'
compile 'com.yammer.metrics:metrics-core:2.2.0'
compile 'net.sf.jopt-simple:jopt-simple:3.2'
http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index aa8d940..0b0dca1 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -507,6 +507,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// The child change watchers will be set inside rebalance when we read the children list.
}
+ override def handleSessionEstablishmentError(error: Throwable): Unit = {
+ fatal("Could not establish session with zookeeper", error)
+ }
}
class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener)
http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index 38f4ec0..f2fa36f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -93,6 +93,10 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
}
}
}
+
+ override def handleSessionEstablishmentError(error: Throwable): Unit = {
+ //no-op ZookeeperConsumerConnector should log error.
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a635116..69bba24 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1112,6 +1112,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
controllerElector.elect
}
}
+
+ override def handleSessionEstablishmentError(error: Throwable): Unit = {
+ //no-op handleSessionEstablishmentError in KafkaHealthCheck should System.exit and log the error.
+ }
}
private def checkAndTriggerPartitionRebalance(): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 861b7f6..ea0c996 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -89,6 +89,11 @@ class KafkaHealthcheck(private val brokerId: Int,
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
+
+ override def handleSessionEstablishmentError(error: Throwable): Unit = {
+ fatal("Could not establish session with zookeeper", error)
+ System.exit(-1)
+ }
}
}