You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/28 00:10:11 UTC

kafka git commit: KAFKA-1664 Kafka does not properly parse multiple ZK nodes with non-root chroot; reviewed by Neha Narkhede and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk a864172da -> b56f5973c


KAFKA-1664 Kafka does not properly parse multiple ZK nodes with non-root chroot; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b56f5973
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b56f5973
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b56f5973

Branch: refs/heads/trunk
Commit: b56f5973c739072350f3f6bf6efa4eb05bc692bf
Parents: a864172
Author: Ashish Singh <as...@cloudera.com>
Authored: Fri Feb 27 15:09:41 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 27 15:10:01 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  46 ++++--
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   | 147 +++++++++++++++++++
 2 files changed, 183 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b56f5973/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 8a2fb2d..7ae999e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
   ZkMarshallingError, ZkBadVersionException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
+import org.apache.kafka.common.config.ConfigException
 import collection._
 import kafka.api.LeaderAndIsr
 import org.apache.zookeeper.data.Stat
@@ -212,7 +213,7 @@ object ZkUtils extends Logging {
    */
   def makeSurePersistentPathExists(client: ZkClient, path: String) {
     if (!client.exists(path))
-      client.createPersistent(path, true) // won't throw NoNodeException or NodeExistsException
+      new ZkPath(client).createPersistent(path, true) // won't throw NoNodeException or NodeExistsException
   }
 
   /**
@@ -220,20 +221,22 @@ object ZkUtils extends Logging {
    */
   private def createParentPath(client: ZkClient, path: String): Unit = {
     val parentDir = path.substring(0, path.lastIndexOf('/'))
-    if (parentDir.length != 0)
-      client.createPersistent(parentDir, true)
+    if (parentDir.length != 0) {
+      new ZkPath(client).createPersistent(parentDir, true)
+    }
   }
 
   /**
    * Create an ephemeral node with the given path and data. Create parents if necessary.
    */
   private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+    val zkPath = new ZkPath(client)
     try {
-      client.createEphemeral(path, data)
+      zkPath.createEphemeral(path, data)
     } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
-        client.createEphemeral(path, data)
+        zkPath.createEphemeral(path, data)
       }
     }
   }
@@ -312,18 +315,19 @@ object ZkUtils extends Logging {
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
   def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
+    val zkPath = new ZkPath(client)
     try {
-      client.createPersistent(path, data)
+      zkPath.createPersistent(path, data)
     } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
-        client.createPersistent(path, data)
+        zkPath.createPersistent(path, data)
       }
     }
   }
 
   def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
-    client.createPersistentSequential(path, data)
+    new ZkPath(client).createPersistentSequential(path, data)
   }
 
   /**
@@ -338,7 +342,7 @@ object ZkUtils extends Logging {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
         try {
-          client.createPersistent(path, data)
+          new ZkPath(client).createPersistent(path, data)
         } catch {
           case e: ZkNodeExistsException =>
             client.writeData(path, data)
@@ -409,7 +413,7 @@ object ZkUtils extends Logging {
     } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
-        client.createEphemeral(path, data)
+        new ZkPath(client).createEphemeral(path, data)
       }
       case e2: Throwable => throw e2
     }
@@ -806,3 +810,25 @@ class ZKConfig(props: VerifiableProperties) {
   /** how far a ZK follower can be behind a ZK leader */
   val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
 }
+
+class ZkPath(client: ZkClient) {
+  if (!client.exists("/")) {
+    throw new ConfigException("Zookeeper namespace does not exist")
+  }
+
+  def createPersistent(path: String, data: Object) {
+    client.createPersistent(path, data)
+  }
+
+  def createPersistent(path: String, createParents: Boolean) {
+    client.createPersistent(path, createParents)
+  }
+
+  def createEphemeral(path: String, data: Object) {
+    client.createEphemeral(path, data)
+  }
+
+  def createPersistentSequential(path: String, data: Object): String = {
+    client.createPersistentSequential(path, data)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b56f5973/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
new file mode 100644
index 0000000..9897b2f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package unit.kafka.zk
+
+import junit.framework.Assert
+import kafka.consumer.ConsumerConfig
+import kafka.utils.{TestUtils, ZKStringSerializer, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.config.ConfigException
+import org.scalatest.junit.JUnit3Suite
+
+class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+  val path: String = "/some_dir"
+  val zkSessionTimeoutMs = 1000
+  val zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
+
+  def testCreatePersistentPathThrowsException {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
+      "test", "1"))
+    var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs,
+      ZKStringSerializer)
+    try {
+      ZkUtils.createPersistentPath(zkClient, path)
+      fail("Failed to throw ConfigException for missing zookeeper root node")
+    } catch {
+      case configException: ConfigException =>
+      case exception: Throwable => fail("Should have thrown ConfigException")
+    }
+  }
+
+  def testCreatePersistentPath {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+      ZKStringSerializer)
+    try {
+      ZkUtils.createPersistentPath(zkClient, path)
+    } catch {
+      case exception: Throwable => fail("Failed to create persistent path")
+    }
+
+    Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path));
+  }
+
+  def testMakeSurePersistsPathExistsThrowsException {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
+      "test", "1"))
+    var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs,
+      ZKStringSerializer)
+    try {
+      ZkUtils.makeSurePersistentPathExists(zkClient, path)
+      fail("Failed to throw ConfigException for missing zookeeper root node")
+    } catch {
+      case configException: ConfigException =>
+      case exception: Throwable => fail("Should have thrown ConfigException")
+    }
+  }
+
+  def testMakeSurePersistsPathExists {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+      ZKStringSerializer)
+    try {
+      ZkUtils.makeSurePersistentPathExists(zkClient, path)
+    } catch {
+      case exception: Throwable => fail("Failed to create persistent path")
+    }
+
+    Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path));
+  }
+
+  def testCreateEphemeralPathThrowsException {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
+      "test", "1"))
+    var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs,
+      ZKStringSerializer)
+    try {
+      ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
+      fail("Failed to throw ConfigException for missing zookeeper root node")
+    } catch {
+      case configException: ConfigException =>
+      case exception: Throwable => fail("Should have thrown ConfigException")
+    }
+  }
+
+  def testCreateEphemeralPathExists {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+      ZKStringSerializer)
+    try {
+      ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
+    } catch {
+      case exception: Throwable => fail("Failed to create ephemeral path")
+    }
+
+    Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path));
+  }
+
+  def testCreatePersistentSequentialThrowsException {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
+      "test", "1"))
+    var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs,
+      ZKStringSerializer)
+    try {
+      ZkUtils.createSequentialPersistentPath(zkClient, path)
+      fail("Failed to throw ConfigException for missing zookeeper root node")
+    } catch {
+      case configException: ConfigException =>
+      case exception: Throwable => fail("Should have thrown ConfigException")
+    }
+  }
+
+  def testCreatePersistentSequentialExists {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+      ZKStringSerializer)
+
+    var actualPath: String = ""
+    try {
+      actualPath = ZkUtils.createSequentialPersistentPath(zkClient, path)
+    } catch {
+      case exception: Throwable => fail("Failed to create persistent path")
+    }
+
+    Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath));
+  }
+}