You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/13 09:32:19 UTC

[kylin] branch master updated: KYLIN-3865 Centralize the zookeeper related info

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new a6aa9c8  KYLIN-3865 Centralize the zookeeper related info
a6aa9c8 is described below

commit a6aa9c812aa7cf93d97fcd8eb554a8c289d1f20d
Author: kyotoYaho <nj...@apache.org>
AuthorDate: Mon Mar 11 18:18:14 2019 +0800

    KYLIN-3865 Centralize the zookeeper related info
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  18 +-
 .../java/org/apache/kylin/common/util/ZKUtil.java  | 217 +++++++++++++++++++++
 .../apache/kylin/common/util/ZooKeeperUtil.java    |  63 ------
 .../kylin/job/impl/curator/CuratorScheduler.java   |  40 ++--
 .../lock/zookeeper}/ZookeeperDistributedLock.java  | 112 +----------
 .../job/lock/zookeeper}/ZookeeperJobLock.java      |   2 +-
 .../impl/curator/CuratorLeaderSelectorTest.java    |   2 +-
 .../job/impl/curator/CuratorSchedulerTest.java     |  23 ++-
 .../kylin/job/impl/curator/ExampleServer.java      |  11 +-
 .../kylin/job/BaseTestDistributedScheduler.java    |  24 +--
 .../kylin/provision/BuildCubeWithEngine.java       |   2 +-
 .../kylin/provision/BuildCubeWithStream.java       |  13 +-
 .../hbase/ITZookeeperDistributedLockTest.java      |   2 +-
 .../org/apache/kylin/rest/service/JobService.java  |   8 +-
 .../kylin/storage/hbase/util/ZookeeperUtil.java    |  34 ----
 15 files changed, 287 insertions(+), 284 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0a2b9ae..3edd458 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -40,7 +40,6 @@ import org.apache.kylin.common.lock.DistributedLockFactory;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.ZooKeeperUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -351,15 +350,7 @@ public abstract class KylinConfigBase implements Serializable {
      * A comma separated list of host:port pairs, each corresponding to a ZooKeeper server
      */
     public String getZookeeperConnectString() {
-        String str = getOptional("kylin.env.zookeeper-connect-string");
-        if (str != null)
-            return str;
-
-        str = ZooKeeperUtil.getZKConnectStringFromHBase();
-        if (str != null)
-            return str;
-
-        throw new RuntimeException("Please set 'kylin.env.zookeeper-connect-string' in kylin.properties");
+        return getOptional("kylin.env.zookeeper-connect-string");
     }
 
     public int getZKBaseSleepTimeMs() {
@@ -466,7 +457,7 @@ public abstract class KylinConfigBase implements Serializable {
 
     public DistributedLockFactory getDistributedLockFactory() {
         String clsName = getOptional("kylin.metadata.distributed-lock-impl",
-                "org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock$Factory");
+                "org.apache.kylin.job.lock.zookeeper.ZookeeperDistributedLock$Factory");
         return (DistributedLockFactory) ClassUtil.newInstance(clsName);
     }
 
@@ -1735,7 +1726,10 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public String getClusterName() {
-        return this.getOptional("kylin.server.cluster-name", getMetadataUrlPrefix());
+        String key = "kylin.server.cluster-name";
+        String clusterName = this.getOptional(key, getMetadataUrlPrefix());
+        setProperty(key, clusterName);
+        return clusterName;
     }
 
     public String getInitTasks() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
new file mode 100644
index 0000000..8979022
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Iterables;
+
+public class ZKUtil {
+    private static final Logger logger = LoggerFactory.getLogger(ZKUtil.class);
+
+    private static final KylinConfig defaultKylinConfig = KylinConfig.getInstanceFromEnv();
+    private static final String zkChRoot = fixPath(defaultKylinConfig.getZookeeperBasePath(),
+            defaultKylinConfig.getClusterName());
+
+    private static String fixPath(String parent, String child) {
+        String path = ZKPaths.makePath(parent, child);
+
+        try {
+            return new File(path).getCanonicalPath();
+        } catch (IOException e) {
+            logger.error("get canonical path failed, use original path", e);
+            return path;
+        }
+    }
+
+    private static Cache<String, CuratorFramework> CACHE = CacheBuilder.newBuilder()
+            .removalListener(new RemovalListener<String, CuratorFramework>() {
+                @Override
+                public void onRemoval(RemovalNotification<String, CuratorFramework> notification) {
+                    logger.info("CuratorFramework for zkString " + notification.getKey() + " is removed due to "
+                            + notification.getCause());
+                    CuratorFramework curator = notification.getValue();
+                    try {
+                        curator.close();
+                    } catch (Exception ex) {
+                        logger.error("Error at closing " + curator, ex);
+                    }
+                }
+            }).expireAfterWrite(1, TimeUnit.DAYS).build();
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                logger.info("Going to remove {} cached curator clients", CACHE.size());
+                CACHE.invalidateAll();
+            }
+        }));
+    }
+
+    /**
+     * Get zookeeper connection string from kylin.properties
+     */
+    public static String getZKConnectString(KylinConfig config) {
+        String zkString = config.getZookeeperConnectString();
+        if (zkString == null) {
+            zkString = getZKConnectStringFromHBase();
+            if (zkString == null) {
+                throw new RuntimeException("Please set 'kylin.env.zookeeper-connect-string' in kylin.properties");
+            }
+        }
+
+        return zkString;
+    }
+
+    public static CuratorFramework getZookeeperClient(KylinConfig config) {
+        RetryPolicy retryPolicy = getRetryPolicy(config);
+        return getZookeeperClient(getZKConnectString(config), retryPolicy);
+    }
+
+    private static CuratorFramework getZookeeperClient(final String zkString, final RetryPolicy retryPolicy) {
+        if (StringUtils.isEmpty(zkString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+        try {
+            CuratorFramework instance = CACHE.get(zkString, new Callable<CuratorFramework>() {
+                @Override
+                public CuratorFramework call() throws Exception {
+                    return newZookeeperClient(zkString, retryPolicy);
+                }
+            });
+            // during test, curator may be closed by others, remove it from CACHE and reinitialize a new one
+            if (instance.getState() != CuratorFrameworkState.STARTED) {
+                logger.warn("curator for {} is closed by others unexpectedly, reinitialize a new one", zkString);
+                CACHE.invalidate(zkString);
+                instance = getZookeeperClient(zkString, retryPolicy);
+            }
+            return instance;
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @VisibleForTesting
+    //no cache
+    public static CuratorFramework newZookeeperClient() {
+        return newZookeeperClient(KylinConfig.getInstanceFromEnv());
+    }
+
+    @VisibleForTesting
+    //no cache
+    public static CuratorFramework newZookeeperClient(KylinConfig config) {
+        RetryPolicy retryPolicy = getRetryPolicy(config);
+        return newZookeeperClient(getZKConnectString(config), retryPolicy);
+    }
+
+    @VisibleForTesting
+    //no cache
+    public static CuratorFramework newZookeeperClient(String zkString, RetryPolicy retryPolicy) {
+        if (zkChRoot == null)
+            throw new NullPointerException("zkChRoot must not be null");
+
+        logger.info("zookeeper connection string: {} with namespace {}", zkString, zkChRoot);
+
+        CuratorFramework instance = getCuratorFramework(zkString, zkChRoot, retryPolicy);
+        instance.start();
+        logger.info("new zookeeper Client start: " + zkString);
+        // create zkChRoot znode if necessary
+        createZkChRootIfNecessary(instance, zkString);
+        return instance;
+    }
+
+    private static RetryPolicy getRetryPolicy(KylinConfig config) {
+        int baseSleepTimeMs = config.getZKBaseSleepTimeMs();
+        int maxRetries = config.getZKMaxRetries();
+        return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
+    }
+
+    private static synchronized void createZkChRootIfNecessary(CuratorFramework instance, String zkString) {
+        try {
+            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+            if (instance.checkExists().forPath("/") == null) {
+                CuratorFramework tmpCurator = getCuratorFramework(zkString, null, retryPolicy);
+                tmpCurator.start();
+                tmpCurator.create().creatingParentsIfNeeded().forPath(zkChRoot);
+                tmpCurator.close();
+            }
+        } catch (KeeperException.NodeExistsException e) {
+            logger.warn("The chRoot znode {} has been created by others", zkChRoot);
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to check or create znode for chRoot " + zkChRoot + " due to ", e);
+        }
+    }
+
+    private static CuratorFramework getCuratorFramework(String zkString, String zkChRoot, RetryPolicy retryPolicy) {
+        if (!Strings.isNullOrEmpty(zkChRoot)) {
+            zkString += zkChRoot;
+        }
+        return CuratorFrameworkFactory.newClient(zkString, 120000, 15000, retryPolicy);
+    }
+
+    private static String getZKConnectStringFromHBase() {
+        Configuration hconf = null;
+        try {
+            Class<? extends Object> hbaseConnClz = ClassUtil.forName("org.apache.kylin.storage.hbase.HBaseConnection",
+                    Object.class);
+            hconf = (Configuration) hbaseConnClz.getMethod("getCurrentHBaseConfiguration").invoke(null);
+        } catch (Throwable ex) {
+            logger.warn("Failed to get zookeeper connect string from HBase configuration", ex);
+            return null;
+        }
+
+        final String serverList = hconf.get("hbase.zookeeper.quorum");
+        final String port = hconf.get("hbase.zookeeper.property.clientPort");
+        return StringUtils
+                .join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+                    @Nullable
+                    @Override
+                    public String apply(String input) {
+                        return input + ":" + port;
+                    }
+                }), ",");
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ZooKeeperUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ZooKeeperUtil.java
deleted file mode 100644
index 48a70b9..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/util/ZooKeeperUtil.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
-/**
- * Use reflection to get zookeeper connect string from HBase configuration.
- */
-public class ZooKeeperUtil {
-    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperUtil.class);
-
-    private ZooKeeperUtil() {
-        throw new IllegalStateException("Class ZooKeeperUtil is an utility class !");
-    }
-
-    public static String getZKConnectStringFromHBase() {
-        Configuration hconf = null;
-        try {
-            Class<? extends Object> hbaseConnClz = ClassUtil.forName("org.apache.kylin.storage.hbase.HBaseConnection", Object.class);
-            hconf = (Configuration) hbaseConnClz.getMethod("getCurrentHBaseConfiguration").invoke(null);
-        } catch (Throwable ex) {
-            logger.warn("Failed to get zookeeper connect string from HBase configuration", ex);
-            return null;
-        }
-        
-        final String serverList = hconf.get("hbase.zookeeper.quorum");
-        final String port = hconf.get("hbase.zookeeper.property.clientPort");
-        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
-            @Nullable
-            @Override
-            public String apply(String input) {
-                return input + ":" + port;
-            }
-        }), ",");
-    }
-}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
index 36be923..ff6e561 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,9 +31,7 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -44,6 +41,7 @@ import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.ServerMode;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.common.util.ZKUtil;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.SchedulerException;
@@ -55,6 +53,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 
@@ -69,33 +68,36 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
     private KylinConfig kylinConfig;
     private AtomicInteger count = new AtomicInteger();
 
-    static final String JOB_ENGINE_LEADER_PATH = "/kylin/%s/job_engine/leader";
-    static final String KYLIN_SERVICE_PATH = "/kylin/%s/service";
+    static final String JOB_ENGINE_LEADER_PATH = "/job_engine/leader";
+    static final String KYLIN_SERVICE_PATH = "/service";
     static final String SERVICE_NAME = "kylin";
     static final String SERVICE_PAYLOAD_DESCRIPTION = "description";
 
+    // the default constructor should exist for reflection initialization
     public CuratorScheduler() {
 
     }
 
+    @VisibleForTesting
+    CuratorScheduler(CuratorFramework curatorClient) {
+        this.curatorClient = curatorClient;
+    }
+
     @Override
     public void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
         kylinConfig = jobEngineConfig.getConfig();
 
-        String zkAddress = kylinConfig.getZookeeperConnectString();
-
         synchronized (this) {
             if (started) {
                 logger.info("CuratorScheduler already started, skipped.");
                 return;
             }
 
-            int baseSleepTimeMs = kylinConfig.getZKBaseSleepTimeMs();
-            int maxRetries = kylinConfig.getZKMaxRetries();
-            curatorClient = CuratorFrameworkFactory.newClient(zkAddress,
-                    new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries));
-            logger.info("New ZK Client start: ", zkAddress);
-            curatorClient.start();
+            // curatorClient can be assigned before only for test cases
+            // due to creating independent curator client rather than share a cached one to avoid influences
+            if (curatorClient == null) {
+                curatorClient = ZKUtil.getZookeeperClient(kylinConfig);
+            }
 
             final String restAddress = kylinConfig.getServerRestAddress();
             try {
@@ -104,7 +106,7 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
                 throw new SchedulerException(e);
             }
 
-            String jobEnginePath = getJobEnginePath(slickMetadataPrefix(kylinConfig.getMetadataUrlPrefix()));
+            String jobEnginePath = JOB_ENGINE_LEADER_PATH;
 
             if (ServerMode.isJob(jobEngineConfig.getConfig())) {
                 jobClient = new CuratorLeaderSelector(curatorClient, jobEnginePath, restAddress, jobEngineConfig);
@@ -127,7 +129,7 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
         final String port = restAddress.substring(restAddress.indexOf(":") + 1);
 
         final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class);
-        final String servicePath = getServicePath(slickMetadataPrefix(kylinConfig.getMetadataUrlPrefix()));
+        final String servicePath = KYLIN_SERVICE_PATH;
         serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient)
                 .basePath(servicePath).serializer(serializer).build();
         serviceDiscovery.start();
@@ -179,14 +181,6 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
         serviceDiscovery.registerService(thisInstance);
     }
 
-    static String getJobEnginePath(String metadataUrlPrefix) {
-        return String.format(Locale.ROOT, JOB_ENGINE_LEADER_PATH, metadataUrlPrefix);
-    }
-
-    static String getServicePath(String metadataUrlPrefix) {
-        return String.format(Locale.ROOT, KYLIN_SERVICE_PATH, metadataUrlPrefix);
-    }
-
     private void monitorJobEngine() {
         logger.info("Start collect monitor ZK Participants");
         Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
similarity index 66%
rename from storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
rename to core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
index fffffd7..07b06db 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
@@ -16,31 +16,24 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.storage.hbase.util;
+package org.apache.kylin.job.lock.zookeeper;
 
 import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
-import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.lock.DistributedLockFactory;
+import org.apache.kylin.common.util.ZKUtil;
 import org.apache.kylin.job.lock.JobLock;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Shell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,51 +47,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
     private static final Random random = new Random();
 
     public static class Factory extends DistributedLockFactory {
-
-        private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
-
-        static {
-            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    for (CuratorFramework curator : CACHE.values()) {
-                        try {
-                            curator.close();
-                        } catch (Exception ex) {
-                            logger.error("Error at closing " + curator, ex);
-                        }
-                    }
-                }
-            }));
-        }
-
-        private static CuratorFramework getZKClient(KylinConfig config) {
-            CuratorFramework zkClient = CACHE.get(config);
-            if (zkClient == null) {
-                synchronized (ZookeeperDistributedLock.class) {
-                    zkClient = CACHE.get(config);
-                    if (zkClient == null) {
-                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-                        String zkConnectString = getZKConnectString(config);
-                        ZookeeperAclBuilder zookeeperAclBuilder = new ZookeeperAclBuilder().invoke();
-                        zkClient = zookeeperAclBuilder.setZKAclBuilder(CuratorFrameworkFactory.builder()).connectString(zkConnectString).sessionTimeoutMs(120000).connectionTimeoutMs(15000).retryPolicy(retryPolicy).build();
-                        zkClient.start();
-                        CACHE.put(config, zkClient);
-                        if (CACHE.size() > 1) {
-                            logger.warn("More than one singleton exist");
-                        }
-                    }
-                }
-            }
-            return zkClient;
-        }
-
-        private static String getZKConnectString(KylinConfig config) {
-            // the ZKConnectString should come from KylinConfig, however it is taken from HBase configuration at the moment
-            return ZookeeperUtil.getZKConnectString();
-        }
-
-        final String zkPathBase;
         final CuratorFramework curator;
 
         public Factory() {
@@ -106,31 +54,30 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
         }
 
         public Factory(KylinConfig config) {
-            this.curator = getZKClient(config);
-            this.zkPathBase = fixSlash(config.getZookeeperBasePath() + "/" + config.getMetadataUrlPrefix());
+            this(ZKUtil.getZookeeperClient(config));
+        }
+
+        public Factory(CuratorFramework curator) {
+            this.curator = curator;
         }
 
         @Override
         public DistributedLock lockForClient(String client) {
-            return new ZookeeperDistributedLock(curator, zkPathBase, client);
+            return new ZookeeperDistributedLock(curator, client);
         }
     }
 
     // ============================================================================
 
     final CuratorFramework curator;
-    final String zkPathBase;
     final String client;
     final byte[] clientBytes;
 
-    private ZookeeperDistributedLock(CuratorFramework curator, String zkPathBase, String client) {
+    private ZookeeperDistributedLock(CuratorFramework curator, String client) {
         if (client == null)
             throw new NullPointerException("client must not be null");
-        if (zkPathBase == null)
-            throw new NullPointerException("zkPathBase must not be null");
 
         this.curator = curator;
-        this.zkPathBase = zkPathBase;
         this.client = client;
         this.clientBytes = client.getBytes(StandardCharsets.UTF_8);
     }
@@ -142,8 +89,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
 
     @Override
     public boolean lock(String lockPath) {
-        lockPath = norm(lockPath);
-
         logger.debug("{} trying to lock {}", client, lockPath);
 
         try {
@@ -166,8 +111,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
 
     @Override
     public boolean lock(String lockPath, long timeout) {
-        lockPath = norm(lockPath);
-
         if (lock(lockPath))
             return true;
 
@@ -198,8 +141,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
 
     @Override
     public String peekLock(String lockPath) {
-        lockPath = norm(lockPath);
-
         try {
             byte[] bytes = curator.getData().forPath(lockPath);
             return new String(bytes, StandardCharsets.UTF_8);
@@ -222,8 +163,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
 
     @Override
     public void unlock(String lockPath) {
-        lockPath = norm(lockPath);
-
         logger.debug("{} trying to unlock {}", client, lockPath);
 
         String owner = peekLock(lockPath);
@@ -244,8 +183,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
 
     @Override
     public void purgeLocks(String lockPathRoot) {
-        lockPathRoot = norm(lockPathRoot);
-
         try {
             curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPathRoot);
 
@@ -258,8 +195,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
 
     @Override
     public Closeable watchLocks(String lockPathRoot, Executor executor, final Watcher watcher) {
-        lockPathRoot = norm(lockPathRoot);
-
         PathChildrenCache cache = new PathChildrenCache(curator, lockPathRoot, true);
         try {
             cache.start();
@@ -284,35 +219,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
         return cache;
     }
 
-    // normalize lock path
-    private String norm(String lockPath) {
-        if (!lockPath.startsWith(zkPathBase))
-            lockPath = zkPathBase + (lockPath.startsWith("/") ? "" : "/") + lockPath;
-        return fixSlash(lockPath);
-    }
-
-    private static String fixSlash(String path) {
-        if (!path.startsWith("/"))
-            path = "/" + path;
-        if (path.endsWith("/"))
-            path = path.substring(0, path.length() - 1);
-        for (int n = Integer.MAX_VALUE; n > path.length();) {
-            n = path.length();
-            path = path.replace("//", "/");
-        }
-
-        if (Shell.WINDOWS) {
-            return new File(path).toURI().getPath();
-        } else {
-            try {
-                return new File(path).getCanonicalPath();
-            } catch (IOException e) {
-                logger.error("get canonical path failed, use original path", e);
-                return path;
-            }
-        }
-    }
-
     // ============================================================================
 
     @Override
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperJobLock.java
similarity index 98%
rename from storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
rename to core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperJobLock.java
index 6e7890b..fcb4f9c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperJobLock.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.storage.hbase.util;
+package org.apache.kylin.job.lock.zookeeper;
 
 import java.io.Closeable;
 import java.util.concurrent.Executor;
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelectorTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelectorTest.java
index 45c0129..e8ba0b2 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelectorTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelectorTest.java
@@ -52,7 +52,7 @@ public class CuratorLeaderSelectorTest extends LocalFileMetadataTestCase {
         final String zkString = zkTestServer.getConnectString();
         final String server1 = "server1:1111";
         final String server2 = "server2:2222";
-        String jobEnginePath = CuratorScheduler.getJobEnginePath(CuratorScheduler.slickMetadataPrefix(kylinConfig.getMetadataUrlPrefix()));
+        String jobEnginePath = CuratorScheduler.JOB_ENGINE_LEADER_PATH;
         CuratorFramework client = CuratorFrameworkFactory.newClient(zkString, new ExponentialBackoffRetry(3000, 3));
         client.start();
         CuratorLeaderSelector s1 = new CuratorLeaderSelector(client //
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
index dbe5bba..d757ccd 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
@@ -17,18 +17,21 @@
  */
 package org.apache.kylin.job.impl.curator;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.ZKUtil;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.junit.After;
 import org.junit.Assert;
@@ -37,10 +40,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.List;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 
 /**
  */
@@ -75,7 +76,6 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
     public void test() throws Exception {
 
         final String zkString = zkTestServer.getConnectString();
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
 
         ServiceDiscovery<LinkedHashMap> serviceDiscovery = null;
         CuratorFramework curatorClient = null;
@@ -83,9 +83,8 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
 
             final CuratorScheduler.JsonInstanceSerializer<LinkedHashMap> serializer = new CuratorScheduler.JsonInstanceSerializer<>(
                     LinkedHashMap.class);
-            String servicePath = CuratorScheduler.getServicePath(CuratorScheduler.slickMetadataPrefix(kylinConfig.getMetadataUrlPrefix()));
-            curatorClient = CuratorFrameworkFactory.newClient(zkString, new ExponentialBackoffRetry(3000, 3));
-            curatorClient.start();
+            String servicePath = CuratorScheduler.KYLIN_SERVICE_PATH;
+            curatorClient = ZKUtil.newZookeeperClient(zkString, new ExponentialBackoffRetry(3000, 3));
             serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient)
                     .basePath(servicePath).serializer(serializer).build();
             serviceDiscovery.start();
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java b/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
index a6df81e..66e3832 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
@@ -18,14 +18,16 @@
 
 package org.apache.kylin.job.impl.curator;
 
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ZKUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.lock.MockJobLock;
 
-import java.io.Closeable;
-import java.io.IOException;
-
 /**
  */
 public class ExampleServer implements Closeable {
@@ -40,7 +42,8 @@ public class ExampleServer implements Closeable {
         KylinConfig kylinConfig1 = KylinConfig.createKylinConfig(kylinConfig);
         kylinConfig1.setProperty("kylin.server.host-address", address);
 
-        scheduler = new CuratorScheduler();
+        CuratorFramework client = ZKUtil.newZookeeperClient(kylinConfig1);
+        scheduler = new CuratorScheduler(client);
         scheduler.init(new JobEngineConfig(kylinConfig1), new MockJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 5a2091d..6c6a36e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -22,22 +22,18 @@ import java.io.File;
 import java.nio.charset.Charset;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.common.util.ZKUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
-import org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock;
-import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperDistributedLock;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -111,7 +107,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
 
     @AfterClass
     public static void after() throws Exception {
-        jobLock1.purgeLocks("");
+        jobLock1.purgeLocks(DistributedScheduler.ZOOKEEPER_LOCK_PATH);
         
         if (scheduler1 != null) {
             scheduler1.shutdown();
@@ -167,17 +163,11 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
     }
 
     private static void initZk() {
-        String zkConnectString = ZookeeperUtil.getZKConnectString();
-        if (StringUtils.isEmpty(zkConnectString)) {
-            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
-        }
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
-        zkClient.start();
+        zkClient = ZKUtil.newZookeeperClient();
     }
 
     String getServerName(String segName) {
-        String lockPath = getFullLockPath(segName);
+        String lockPath = DistributedScheduler.getLockPath(segName);
         String serverName = null;
         if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
             try {
@@ -191,8 +181,4 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
         }
         return serverName;
     }
-
-    private String getFullLockPath(String segName) {
-        return DistributedScheduler.dropDoubleSlash("/kylin/" + kylinConfig1.getMetadataUrlPrefix() + DistributedScheduler.getLockPath(segName));
-    }
 }
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index ec5bc35..bd3e7c5 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -70,7 +70,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 2153821..5b6f26a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -36,15 +36,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.common.util.ZKUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -56,6 +54,7 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
 import org.apache.kylin.job.streaming.Kafka10DataLoader;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -68,8 +67,6 @@ import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,7 +136,7 @@ public class BuildCubeWithStream {
 
     private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
         //Start mock Kakfa
-        String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath;
+        String zkConnectionStr = ZKUtil.getZKConnectString(KylinConfig.getInstanceFromEnv()) + kafkaZkPath;
         System.out.println("zkConnectionStr" + zkConnectionStr);
         zkConnection = new ZkConnection(zkConnectionStr);
         // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
@@ -333,9 +330,7 @@ public class BuildCubeWithStream {
     }
 
     private void cleanKafkaZkPath(String path) {
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), retryPolicy);
-        zkClient.start();
+        CuratorFramework zkClient = ZKUtil.newZookeeperClient();
 
         try {
             zkClient.delete().deletingChildrenIfNeeded().forPath(kafkaZkPath);
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java
index 48d6736..e369ff0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.lock.DistributedLock.Watcher;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperDistributedLock;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4f15d9e..31a4119 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -76,7 +76,7 @@ import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -120,6 +120,12 @@ public class JobService extends BasicService implements InitializingBean {
         TimeZone.setDefault(tzone);
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        // In case of that kylin.server.cluster-name is not set,
+        // this method have to be called first to avoid the influence of the change of kylin.metadata.url
+        String clusterName = kylinConfig.getClusterName();
+        logger.info("starting to initialize an instance in cluster {}", clusterName);
+
         final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory
                 .scheduler(kylinConfig.getSchedulerType());
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
deleted file mode 100644
index 20569d3..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.util;
-
-import org.apache.kylin.common.KylinConfig;
-
-public class ZookeeperUtil {
-
-    public static String ZOOKEEPER_UTIL_HBASE_CLASSNAME = "org.apache.kylin.storage.hbase.util.ZooKeeperUtilHbase";
-
-    /**
-     * Get zookeeper connection string from HBase Configuration or from kylin.properties
-     */
-    public static String getZKConnectString() {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        return config.getZookeeperConnectString();
-    }
-}
\ No newline at end of file