You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/19 10:36:49 UTC
kafka git commit: KAFKA-5049;
Chroot check should be done for each ZkUtils instance
Repository: kafka
Updated Branches:
refs/heads/trunk c4e59a338 -> 41c0f8add
KAFKA-5049; Chroot check should be done for each ZkUtils instance
Author: anukin <an...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2857 from anukin/KAFKA_5049_zkroot_check
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41c0f8ad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41c0f8ad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41c0f8ad
Branch: refs/heads/trunk
Commit: 41c0f8addeef44be23524bade61bcb2ab6077706
Parents: c4e59a3
Author: anukin <an...@gmail.com>
Authored: Wed Apr 19 11:25:30 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Apr 19 11:30:55 2017 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/utils/ZkUtils.scala | 46 ++++++++++----------
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 18 ++++----
2 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/41c0f8ad/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 724414e..4e2b11a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -65,8 +65,6 @@ object ZkUtils {
val ConfigChangesPath = s"$ConfigPath/changes"
val ConfigUsersPath = s"$ConfigPath/users"
val PidBlockPath = "/latest_pid_block"
-
-
// Important: it is necessary to add any new top level Zookeeper path to the Seq
val SecureZkRootPaths = Seq(AdminPath,
BrokersPath,
@@ -243,6 +241,9 @@ class ZkUtils(val zkClient: ZkClient,
IsrChangeNotificationPath,
PidBlockPath)
+ // Visible for testing
+ val zkPath = new ZkPath(zkClient)
+
import ZkUtils._
@deprecated("This is deprecated, use defaultAcls(path) which doesn't make sensitive data world readable", since = "0.10.2.1")
@@ -451,7 +452,7 @@ class ZkUtils(val zkClient: ZkClient,
}
if (!zkClient.exists(path))
- ZkPath.createPersistent(zkClient, path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
+ zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
}
/**
@@ -461,7 +462,7 @@ class ZkUtils(val zkClient: ZkClient,
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
val parentDir = path.substring(0, path.lastIndexOf('/'))
if (parentDir.length != 0) {
- ZkPath.createPersistent(zkClient, parentDir, createParents = true, acl)
+ zkPath.createPersistent(parentDir, createParents = true, acl)
}
}
@@ -471,11 +472,11 @@ class ZkUtils(val zkClient: ZkClient,
private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
- ZkPath.createEphemeral(zkClient, path, data, acl)
+ zkPath.createEphemeral(path, data, acl)
} catch {
case _: ZkNoNodeException =>
createParentPath(path)
- ZkPath.createEphemeral(zkClient, path, data, acl)
+ zkPath.createEphemeral(path, data, acl)
}
}
@@ -512,17 +513,17 @@ class ZkUtils(val zkClient: ZkClient,
def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
- ZkPath.createPersistent(zkClient, path, data, acl)
+ zkPath.createPersistent(path, data, acl)
} catch {
case _: ZkNoNodeException =>
createParentPath(path)
- ZkPath.createPersistent(zkClient, path, data, acl)
+ zkPath.createPersistent(path, data, acl)
}
}
def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
- ZkPath.createPersistentSequential(zkClient, path, data, acl)
+ zkPath.createPersistentSequential(path, data, acl)
}
/**
@@ -538,7 +539,7 @@ class ZkUtils(val zkClient: ZkClient,
case _: ZkNoNodeException =>
createParentPath(path)
try {
- ZkPath.createPersistent(zkClient, path, data, acl)
+ zkPath.createPersistent(path, data, acl)
} catch {
case _: ZkNodeExistsException =>
zkClient.writeData(path, data)
@@ -608,7 +609,7 @@ class ZkUtils(val zkClient: ZkClient,
} catch {
case _: ZkNoNodeException =>
createParentPath(path)
- ZkPath.createEphemeral(zkClient, path, data, acl)
+ zkPath.createEphemeral(path, data, acl)
}
}
@@ -993,11 +994,12 @@ class ZKConfig(props: VerifiableProperties) {
val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
}
-object ZkPath {
+class ZkPath(client: ZkClient) {
+
@volatile private var isNamespacePresent: Boolean = false
- def checkNamespace(client: ZkClient) {
- if(isNamespacePresent)
+ def checkNamespace() {
+ if (isNamespacePresent)
return
if (!client.exists("/")) {
@@ -1010,23 +1012,23 @@ object ZkPath {
isNamespacePresent = false
}
- def createPersistent(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) {
- checkNamespace(client)
+ def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
+ checkNamespace()
client.createPersistent(path, data, acls)
}
- def createPersistent(client: ZkClient, path: String, createParents: Boolean, acls: java.util.List[ACL]) {
- checkNamespace(client)
+ def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) {
+ checkNamespace()
client.createPersistent(path, createParents, acls)
}
- def createEphemeral(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) {
- checkNamespace(client)
+ def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
+ checkNamespace()
client.createEphemeral(path, data, acls)
}
- def createPersistentSequential(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]): String = {
- checkNamespace(client)
+ def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = {
+ checkNamespace()
client.createPersistentSequential(path, data, acls)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/41c0f8ad/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index d8f0de4..04f8aaf 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -18,7 +18,7 @@
package kafka.zk
import kafka.consumer.ConsumerConfig
-import kafka.utils.{ZkPath, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils, ZkUtils}
import org.apache.kafka.common.config.ConfigException
import org.junit.Assert._
import org.junit.Test
@@ -36,7 +36,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, false)
try {
- ZkPath.resetNamespaceCheckedState
+ zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.createPersistentPath(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
@@ -49,7 +49,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testCreatePersistentPath {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
- ZkPath.resetNamespaceCheckedState
+ zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.createPersistentPath(path)
assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
zkUtils.close()
@@ -60,7 +60,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
- ZkPath.resetNamespaceCheckedState
+ zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.makeSurePersistentPathExists(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
@@ -73,7 +73,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testMakeSurePersistsPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
- ZkPath.resetNamespaceCheckedState
+ zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.makeSurePersistentPathExists(path)
assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
zkUtils.close()
@@ -84,7 +84,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
- ZkPath.resetNamespaceCheckedState
+ zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.createEphemeralPathExpectConflict(path, "somedata")
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
@@ -97,7 +97,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testCreateEphemeralPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
- ZkPath.resetNamespaceCheckedState
+ zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.createEphemeralPathExpectConflict(path, "somedata")
assertTrue("Failed to create ephemeral path", zkUtils.pathExists(path))
zkUtils.close()
@@ -109,7 +109,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
"test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
- ZkPath.resetNamespaceCheckedState
+ zkUtils.zkPath.resetNamespaceCheckedState
zkUtils.createSequentialPersistentPath(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
@@ -122,7 +122,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testCreatePersistentSequentialExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
- ZkPath.resetNamespaceCheckedState
+ zkUtils.zkPath.resetNamespaceCheckedState
val actualPath = zkUtils.createSequentialPersistentPath(path)
assertTrue("Failed to create persistent path", zkUtils.pathExists(actualPath))
zkUtils.close()