You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/26 06:08:15 UTC
git commit: KAFKA-404 When using chroot path, create chroot on
startup if it doesn't exist; reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 084566b83 -> f750dba65
KAFKA-404 When using chroot path, create chroot on startup if it doesn't exist; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f750dba6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f750dba6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f750dba6
Branch: refs/heads/trunk
Commit: f750dba65f9d9552a61a0754c46fa6e294785b31
Parents: 084566b
Author: Jonathan Creasy <jo...@ghostlab.net>
Authored: Thu Sep 25 13:18:12 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Sep 25 17:49:29 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/server/KafkaServer.scala | 20 +++++++-
.../unit/kafka/server/ServerStartupTest.scala | 54 ++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f750dba6/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 390fef5..14a7ba5 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,6 +25,7 @@ import kafka.utils._
import java.util.concurrent._
import atomic.{AtomicInteger, AtomicBoolean}
import java.io.File
+import java.net.BindException
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.Broker
@@ -126,14 +127,31 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
startupComplete.set(true)
info("started")
}
-
+
private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
+
+ val chroot = {
+ if (config.zkConnect.indexOf("/") > 0)
+ config.zkConnect.substring(config.zkConnect.indexOf("/"))
+ else
+ ""
+ }
+
+ if (chroot.length > 1) {
+ val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
+ val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+ ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
+ info("Created zookeeper path " + chroot)
+ zkClientForChrootCreation.close()
+ }
+
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
+
/**
* Forces some dynamic jmx beans to be registered on server startup.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/f750dba6/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
new file mode 100644
index 0000000..a0ed485
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk
+import kafka.utils.ZkUtils
+import kafka.utils.Utils
+import kafka.utils.TestUtils
+
+import kafka.zk.ZooKeeperTestHarness
+import junit.framework.Assert._
+
+class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
+ var server : KafkaServer = null
+ val brokerId = 0
+ val zookeeperChroot = "/kafka-chroot-for-unittest"
+
+ override def setUp() {
+ super.setUp()
+ val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
+ val zooKeeperConnect = props.get("zookeeper.connect")
+ props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
+
+ server = TestUtils.createServer(new KafkaConfig(props))
+ }
+
+ override def tearDown() {
+ server.shutdown()
+ Utils.rm(server.config.logDirs)
+ super.tearDown()
+ }
+
+ def testBrokerCreatesZKChroot {
+ val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
+ assertTrue(pathExists)
+ }
+
+}
\ No newline at end of file