You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/28 00:10:11 UTC
kafka git commit: KAFKA-1664 Kafka does not properly parse multiple
ZK nodes with non-root chroot; reviewed by Neha Narkhede and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk a864172da -> b56f5973c
KAFKA-1664 Kafka does not properly parse multiple ZK nodes with non-root chroot; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b56f5973
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b56f5973
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b56f5973
Branch: refs/heads/trunk
Commit: b56f5973c739072350f3f6bf6efa4eb05bc692bf
Parents: a864172
Author: Ashish Singh <as...@cloudera.com>
Authored: Fri Feb 27 15:09:41 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 27 15:10:01 2015 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/utils/ZkUtils.scala | 46 ++++--
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 147 +++++++++++++++++++
2 files changed, 183 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b56f5973/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 8a2fb2d..7ae999e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
ZkMarshallingError, ZkBadVersionException}
import org.I0Itec.zkclient.serialize.ZkSerializer
+import org.apache.kafka.common.config.ConfigException
import collection._
import kafka.api.LeaderAndIsr
import org.apache.zookeeper.data.Stat
@@ -212,7 +213,7 @@ object ZkUtils extends Logging {
*/
def makeSurePersistentPathExists(client: ZkClient, path: String) {
if (!client.exists(path))
- client.createPersistent(path, true) // won't throw NoNodeException or NodeExistsException
+ new ZkPath(client).createPersistent(path, true) // won't throw NoNodeException or NodeExistsException
}
/**
@@ -220,20 +221,22 @@ object ZkUtils extends Logging {
*/
private def createParentPath(client: ZkClient, path: String): Unit = {
val parentDir = path.substring(0, path.lastIndexOf('/'))
- if (parentDir.length != 0)
- client.createPersistent(parentDir, true)
+ if (parentDir.length != 0) {
+ new ZkPath(client).createPersistent(parentDir, true)
+ }
}
/**
* Create an ephemeral node with the given path and data. Create parents if necessary.
*/
private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+ val zkPath = new ZkPath(client)
try {
- client.createEphemeral(path, data)
+ zkPath.createEphemeral(path, data)
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
- client.createEphemeral(path, data)
+ zkPath.createEphemeral(path, data)
}
}
}
@@ -312,18 +315,19 @@ object ZkUtils extends Logging {
* Create an persistent node with the given path and data. Create parents if necessary.
*/
def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
+ val zkPath = new ZkPath(client)
try {
- client.createPersistent(path, data)
+ zkPath.createPersistent(path, data)
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
- client.createPersistent(path, data)
+ zkPath.createPersistent(path, data)
}
}
}
def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
- client.createPersistentSequential(path, data)
+ new ZkPath(client).createPersistentSequential(path, data)
}
/**
@@ -338,7 +342,7 @@ object ZkUtils extends Logging {
case e: ZkNoNodeException => {
createParentPath(client, path)
try {
- client.createPersistent(path, data)
+ new ZkPath(client).createPersistent(path, data)
} catch {
case e: ZkNodeExistsException =>
client.writeData(path, data)
@@ -409,7 +413,7 @@ object ZkUtils extends Logging {
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
- client.createEphemeral(path, data)
+ new ZkPath(client).createEphemeral(path, data)
}
case e2: Throwable => throw e2
}
@@ -806,3 +810,25 @@ class ZKConfig(props: VerifiableProperties) {
/** how far a ZK follower can be behind a ZK leader */
val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
}
+
+class ZkPath(client: ZkClient) {
+ if (!client.exists("/")) {
+ throw new ConfigException("Zookeeper namespace does not exist")
+ }
+
+ def createPersistent(path: String, data: Object) {
+ client.createPersistent(path, data)
+ }
+
+ def createPersistent(path: String, createParents: Boolean) {
+ client.createPersistent(path, createParents)
+ }
+
+ def createEphemeral(path: String, data: Object) {
+ client.createEphemeral(path, data)
+ }
+
+ def createPersistentSequential(path: String, data: Object): String = {
+ client.createPersistentSequential(path, data)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b56f5973/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
new file mode 100644
index 0000000..9897b2f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package unit.kafka.zk
+
+import junit.framework.Assert
+import kafka.consumer.ConsumerConfig
+import kafka.utils.{TestUtils, ZKStringSerializer, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.config.ConfigException
+import org.scalatest.junit.JUnit3Suite
+
+class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+ val path: String = "/some_dir"
+ val zkSessionTimeoutMs = 1000
+ val zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
+
+ def testCreatePersistentPathThrowsException {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
+ "test", "1"))
+ var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs,
+ ZKStringSerializer)
+ try {
+ ZkUtils.createPersistentPath(zkClient, path)
+ fail("Failed to throw ConfigException for missing zookeeper root node")
+ } catch {
+ case configException: ConfigException =>
+ case exception: Throwable => fail("Should have thrown ConfigException")
+ }
+ }
+
+ def testCreatePersistentPath {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+ var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ ZKStringSerializer)
+ try {
+ ZkUtils.createPersistentPath(zkClient, path)
+ } catch {
+ case exception: Throwable => fail("Failed to create persistent path")
+ }
+
+ Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path));
+ }
+
+ def testMakeSurePersistsPathExistsThrowsException {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
+ "test", "1"))
+ var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs,
+ ZKStringSerializer)
+ try {
+ ZkUtils.makeSurePersistentPathExists(zkClient, path)
+ fail("Failed to throw ConfigException for missing zookeeper root node")
+ } catch {
+ case configException: ConfigException =>
+ case exception: Throwable => fail("Should have thrown ConfigException")
+ }
+ }
+
+ def testMakeSurePersistsPathExists {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+ var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ ZKStringSerializer)
+ try {
+ ZkUtils.makeSurePersistentPathExists(zkClient, path)
+ } catch {
+ case exception: Throwable => fail("Failed to create persistent path")
+ }
+
+ Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path));
+ }
+
+ def testCreateEphemeralPathThrowsException {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
+ "test", "1"))
+ var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs,
+ ZKStringSerializer)
+ try {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
+ fail("Failed to throw ConfigException for missing zookeeper root node")
+ } catch {
+ case configException: ConfigException =>
+ case exception: Throwable => fail("Should have thrown ConfigException")
+ }
+ }
+
+ def testCreateEphemeralPathExists {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+ var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ ZKStringSerializer)
+ try {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
+ } catch {
+ case exception: Throwable => fail("Failed to create ephemeral path")
+ }
+
+ Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path));
+ }
+
+ def testCreatePersistentSequentialThrowsException {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
+ "test", "1"))
+ var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs,
+ ZKStringSerializer)
+ try {
+ ZkUtils.createSequentialPersistentPath(zkClient, path)
+ fail("Failed to throw ConfigException for missing zookeeper root node")
+ } catch {
+ case configException: ConfigException =>
+ case exception: Throwable => fail("Should have thrown ConfigException")
+ }
+ }
+
+ def testCreatePersistentSequentialExists {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+ var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ ZKStringSerializer)
+
+ var actualPath: String = ""
+ try {
+ actualPath = ZkUtils.createSequentialPersistentPath(zkClient, path)
+ } catch {
+ case exception: Throwable => fail("Failed to create persistent path")
+ }
+
+ Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath));
+ }
+}