You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/19 10:36:49 UTC

kafka git commit: KAFKA-5049; Chroot check should be done for each ZkUtils instance

Repository: kafka
Updated Branches:
  refs/heads/trunk c4e59a338 -> 41c0f8add


KAFKA-5049; Chroot check should be done for each ZkUtils instance

Author: anukin <an...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2857 from anukin/KAFKA_5049_zkroot_check


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41c0f8ad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41c0f8ad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41c0f8ad

Branch: refs/heads/trunk
Commit: 41c0f8addeef44be23524bade61bcb2ab6077706
Parents: c4e59a3
Author: anukin <an...@gmail.com>
Authored: Wed Apr 19 11:25:30 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Apr 19 11:30:55 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 46 ++++++++++----------
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   | 18 ++++----
 2 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/41c0f8ad/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 724414e..4e2b11a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -65,8 +65,6 @@ object ZkUtils {
   val ConfigChangesPath = s"$ConfigPath/changes"
   val ConfigUsersPath = s"$ConfigPath/users"
   val PidBlockPath = "/latest_pid_block"
-
-
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
   val SecureZkRootPaths = Seq(AdminPath,
                               BrokersPath,
@@ -243,6 +241,9 @@ class ZkUtils(val zkClient: ZkClient,
                               IsrChangeNotificationPath,
                               PidBlockPath)
 
+  // Visible for testing
+  val zkPath = new ZkPath(zkClient)
+
   import ZkUtils._
 
   @deprecated("This is deprecated, use defaultAcls(path) which doesn't make sensitive data world readable", since = "0.10.2.1")
@@ -451,7 +452,7 @@ class ZkUtils(val zkClient: ZkClient,
     }
 
     if (!zkClient.exists(path))
-      ZkPath.createPersistent(zkClient, path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
+      zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
   }
 
   /**
@@ -461,7 +462,7 @@ class ZkUtils(val zkClient: ZkClient,
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
     val parentDir = path.substring(0, path.lastIndexOf('/'))
     if (parentDir.length != 0) {
-      ZkPath.createPersistent(zkClient, parentDir, createParents = true, acl)
+      zkPath.createPersistent(parentDir, createParents = true, acl)
     }
   }
 
@@ -471,11 +472,11 @@ class ZkUtils(val zkClient: ZkClient,
   private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      ZkPath.createEphemeral(zkClient, path, data, acl)
+      zkPath.createEphemeral(path, data, acl)
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
-        ZkPath.createEphemeral(zkClient, path, data, acl)
+        zkPath.createEphemeral(path, data, acl)
     }
   }
 
@@ -512,17 +513,17 @@ class ZkUtils(val zkClient: ZkClient,
   def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      ZkPath.createPersistent(zkClient, path, data, acl)
+      zkPath.createPersistent(path, data, acl)
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
-        ZkPath.createPersistent(zkClient, path, data, acl)
+        zkPath.createPersistent(path, data, acl)
     }
   }
 
   def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    ZkPath.createPersistentSequential(zkClient, path, data, acl)
+    zkPath.createPersistentSequential(path, data, acl)
   }
 
   /**
@@ -538,7 +539,7 @@ class ZkUtils(val zkClient: ZkClient,
       case _: ZkNoNodeException =>
         createParentPath(path)
         try {
-          ZkPath.createPersistent(zkClient, path, data, acl)
+          zkPath.createPersistent(path, data, acl)
         } catch {
           case _: ZkNodeExistsException =>
             zkClient.writeData(path, data)
@@ -608,7 +609,7 @@ class ZkUtils(val zkClient: ZkClient,
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
-        ZkPath.createEphemeral(zkClient, path, data, acl)
+        zkPath.createEphemeral(path, data, acl)
     }
   }
 
@@ -993,11 +994,12 @@ class ZKConfig(props: VerifiableProperties) {
   val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
 }
 
-object ZkPath {
+class ZkPath(client: ZkClient) {
+
   @volatile private var isNamespacePresent: Boolean = false
 
-  def checkNamespace(client: ZkClient) {
-    if(isNamespacePresent)
+  def checkNamespace() {
+    if (isNamespacePresent)
       return
 
     if (!client.exists("/")) {
@@ -1010,23 +1012,23 @@ object ZkPath {
     isNamespacePresent = false
   }
 
-  def createPersistent(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) {
-    checkNamespace(client)
+  def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
+    checkNamespace()
     client.createPersistent(path, data, acls)
   }
 
-  def createPersistent(client: ZkClient, path: String, createParents: Boolean, acls: java.util.List[ACL]) {
-    checkNamespace(client)
+  def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) {
+    checkNamespace()
     client.createPersistent(path, createParents, acls)
   }
 
-  def createEphemeral(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) {
-    checkNamespace(client)
+  def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
+    checkNamespace()
     client.createEphemeral(path, data, acls)
   }
 
-  def createPersistentSequential(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]): String = {
-    checkNamespace(client)
+  def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = {
+    checkNamespace()
     client.createPersistentSequential(path, data, acls)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/41c0f8ad/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index d8f0de4..04f8aaf 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -18,7 +18,7 @@
 package kafka.zk
 
 import kafka.consumer.ConsumerConfig
-import kafka.utils.{ZkPath, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils, ZkUtils}
 import org.apache.kafka.common.config.ConfigException
 import org.junit.Assert._
 import org.junit.Test
@@ -36,7 +36,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
       config.zkConnectionTimeoutMs, false)
     try {
-      ZkPath.resetNamespaceCheckedState
+      zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createPersistentPath(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
@@ -49,7 +49,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreatePersistentPath {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-    ZkPath.resetNamespaceCheckedState
+    zkUtils.zkPath.resetNamespaceCheckedState
     zkUtils.createPersistentPath(path)
     assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
     zkUtils.close()
@@ -60,7 +60,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
     val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
-      ZkPath.resetNamespaceCheckedState
+      zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.makeSurePersistentPathExists(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
@@ -73,7 +73,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testMakeSurePersistsPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-    ZkPath.resetNamespaceCheckedState
+    zkUtils.zkPath.resetNamespaceCheckedState
     zkUtils.makeSurePersistentPathExists(path)
     assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
     zkUtils.close()
@@ -84,7 +84,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
     val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
-      ZkPath.resetNamespaceCheckedState
+      zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createEphemeralPathExpectConflict(path, "somedata")
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
@@ -97,7 +97,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreateEphemeralPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-    ZkPath.resetNamespaceCheckedState
+    zkUtils.zkPath.resetNamespaceCheckedState
     zkUtils.createEphemeralPathExpectConflict(path, "somedata")
     assertTrue("Failed to create ephemeral path", zkUtils.pathExists(path))
     zkUtils.close()
@@ -109,7 +109,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
       "test", "1"))
     val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
-      ZkPath.resetNamespaceCheckedState
+      zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createSequentialPersistentPath(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
@@ -122,7 +122,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreatePersistentSequentialExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-    ZkPath.resetNamespaceCheckedState
+    zkUtils.zkPath.resetNamespaceCheckedState
     val actualPath = zkUtils.createSequentialPersistentPath(path)
     assertTrue("Failed to create persistent path", zkUtils.pathExists(actualPath))
     zkUtils.close()