You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/05/18 22:03:12 UTC

kafka git commit: kafka-2169; Upgrade to zkclient-0.5; patched by Parth Brahmbhatt; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 49026f117 -> 41ba26273


kafka-2169; Upgrade to zkclient-0.5; patched by Parth Brahmbhatt; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41ba2627
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41ba2627
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41ba2627

Branch: refs/heads/trunk
Commit: 41ba26273b497e4cbcc947c742ff6831b7320152
Parents: 49026f1
Author: Parth Brahmbhatt <pb...@hortonworks.com>
Authored: Mon May 18 13:02:47 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon May 18 13:02:47 2015 -0700

----------------------------------------------------------------------
 build.gradle                                                    | 2 +-
 .../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala  | 3 +++
 .../main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala  | 4 ++++
 core/src/main/scala/kafka/controller/KafkaController.scala      | 4 ++++
 core/src/main/scala/kafka/server/KafkaHealthcheck.scala         | 5 +++++
 5 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index fef515b..cd2aa83 100644
--- a/build.gradle
+++ b/build.gradle
@@ -207,7 +207,7 @@ project(':core') {
     compile project(':clients')
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile 'org.apache.zookeeper:zookeeper:3.4.6'
-    compile 'com.101tec:zkclient:0.3'
+    compile 'com.101tec:zkclient:0.5'
     compile 'com.yammer.metrics:metrics-core:2.2.0'
     compile 'net.sf.jopt-simple:jopt-simple:3.2'
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index aa8d940..0b0dca1 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -507,6 +507,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       // The child change watchers will be set inside rebalance when we read the children list.
     }
 
+    override def handleSessionEstablishmentError(error: Throwable): Unit = {
+      fatal("Could not establish session with zookeeper", error)
+    }
   }
 
   class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener)

http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index 38f4ec0..f2fa36f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -93,6 +93,10 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
         }
       }
     }
+
+    override def handleSessionEstablishmentError(error: Throwable): Unit = {
+      //no-op ZookeeperConsumerConnector should log error.
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a635116..69bba24 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1112,6 +1112,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
         controllerElector.elect
       }
     }
+
+    override def handleSessionEstablishmentError(error: Throwable): Unit = {
+      //no-op handleSessionEstablishmentError in KafkaHealthCheck should System.exit and log the error.
+    }
   }
 
   private def checkAndTriggerPartitionRebalance(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/41ba2627/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 861b7f6..ea0c996 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -89,6 +89,11 @@ class KafkaHealthcheck(private val brokerId: Int,
       info("done re-registering broker")
       info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
     }
+
+    override def handleSessionEstablishmentError(error: Throwable): Unit = {
+      fatal("Could not establish session with zookeeper", error)
+      System.exit(-1)
+    }
   }
 
 }