You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/26 06:08:15 UTC

git commit: KAFKA-404 When using chroot path, create chroot on startup if it doesn't exist; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 084566b83 -> f750dba65


KAFKA-404 When using chroot path, create chroot on startup if it doesn't exist; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f750dba6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f750dba6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f750dba6

Branch: refs/heads/trunk
Commit: f750dba65f9d9552a61a0754c46fa6e294785b31
Parents: 084566b
Author: Jonathan Creasy <jo...@ghostlab.net>
Authored: Thu Sep 25 13:18:12 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Sep 25 17:49:29 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaServer.scala   | 20 +++++++-
 .../unit/kafka/server/ServerStartupTest.scala   | 54 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f750dba6/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 390fef5..14a7ba5 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,6 +25,7 @@ import kafka.utils._
 import java.util.concurrent._
 import atomic.{AtomicInteger, AtomicBoolean}
 import java.io.File
+import java.net.BindException
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.Broker
@@ -126,14 +127,31 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     startupComplete.set(true)
     info("started")
   }
-  
+
   private def initZk(): ZkClient = {
     info("Connecting to zookeeper on " + config.zkConnect)
+
+    val chroot = {
+      if (config.zkConnect.indexOf("/") > 0)
+        config.zkConnect.substring(config.zkConnect.indexOf("/"))
+      else
+        ""
+    }
+
+    if (chroot.length > 1) {
+      val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
+      val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+      ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
+      info("Created zookeeper path " + chroot)
+      zkClientForChrootCreation.close()
+    }
+
     val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
     ZkUtils.setupCommonPaths(zkClient)
     zkClient
   }
 
+
   /**
    *  Forces some dynamic jmx beans to be registered on server startup.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/f750dba6/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
new file mode 100644
index 0000000..a0ed485
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk
+import kafka.utils.ZkUtils
+import kafka.utils.Utils
+import kafka.utils.TestUtils
+
+import kafka.zk.ZooKeeperTestHarness
+import junit.framework.Assert._
+
+class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
+  var server : KafkaServer = null
+  val brokerId = 0
+  val zookeeperChroot = "/kafka-chroot-for-unittest"
+
+  override def setUp() {
+    super.setUp()
+    val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
+    val zooKeeperConnect = props.get("zookeeper.connect")
+    props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
+
+    server = TestUtils.createServer(new KafkaConfig(props))
+  }
+
+  override def tearDown() {
+    server.shutdown()
+    Utils.rm(server.config.logDirs)
+    super.tearDown()
+  }
+
+  def testBrokerCreatesZKChroot {
+    val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
+    assertTrue(pathExists)
+  }
+
+}
\ No newline at end of file