You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/02/10 04:02:42 UTC

[6/9] kylin git commit: fix some testcases which can not be run concurrently

fix some testcases which can not be run concurrently

Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f2e8b690
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f2e8b690
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f2e8b690

Branch: refs/heads/master-hbase0.98
Commit: f2e8b690f186abe5048dee8a6b0339c2a28c0594
Parents: 3ee4946
Author: etherge <et...@163.com>
Authored: Wed Feb 8 23:50:35 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Feb 9 22:05:11 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   | 16 +++---
 .../apache/kylin/common/KylinConfigBase.java    |  7 ++-
 .../apache/kylin/common/KylinConfigTest.java    | 17 ++++++-
 .../apache/kylin/dict/CachedTreeMapTest.java    |  7 ++-
 .../kylin/job/BaseTestDistributedScheduler.java | 29 ++---------
 .../kylin/provision/BuildCubeWithStream.java    | 32 ++++++++++--
 .../org/apache/kylin/provision/MockKafka.java   | 11 ++---
 .../hbase/util/ZookeeperDistributedJobLock.java | 24 +--------
 .../kylin/storage/hbase/util/ZookeeperUtil.java | 52 ++++++++++++++++++++
 9 files changed, 124 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index e8c7fae..fdcd52c 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -56,6 +56,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 
 public class DeployUtil {
     private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
@@ -139,7 +140,7 @@ public class DeployUtil {
         boolean buildCubeUsingProvidedData = Boolean.parseBoolean(System.getProperty("buildCubeUsingProvidedData"));
         if (!buildCubeUsingProvidedData) {
             System.out.println("build cube with random dataset");
-            
+
             // data is generated according to cube descriptor and saved in resource store
             MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
             DataModelDesc model = mgr.getDataModelDesc(modelName);
@@ -208,12 +209,12 @@ public class DeployUtil {
         MetadataManager metaMgr = MetadataManager.getInstance(config());
 
         // scp data files, use the data from hbase, instead of local files
-        File temp = File.createTempFile("temp", ".csv");
-        temp.createNewFile();
+        File tempDir = Files.createTempDir();
+        String tempDirAbsPath = tempDir.getAbsolutePath();
         for (String tablename : TABLE_NAMES) {
             tablename = tablename.toUpperCase();
 
-            File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
+            File localBufferFile = new File(tempDirAbsPath + "/" + tablename + ".csv");
             localBufferFile.createNewFile();
 
             InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream;
@@ -225,8 +226,7 @@ public class DeployUtil {
 
             localBufferFile.deleteOnExit();
         }
-        String tableFileDir = temp.getParent();
-        temp.delete();
+        tempDir.deleteOnExit();
 
         IHiveClient hiveClient = HiveClientFactory.getHiveClient();
         // create hive tables
@@ -238,7 +238,7 @@ public class DeployUtil {
         // load data to hive tables
         // LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
         for (String tablename : TABLE_NAMES) {
-            hiveClient.executeHQL(generateLoadDataHql(tablename.toUpperCase(), tableFileDir));
+            hiveClient.executeHQL(generateLoadDataHql(tablename.toUpperCase(), tempDirAbsPath));
         }
 
         final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
@@ -255,7 +255,7 @@ public class DeployUtil {
 
         String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
         String dropsql2 = "DROP VIEW IF EXISTS " + tableDesc.getIdentity();
-        
+
         StringBuilder ddl = new StringBuilder();
         ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
         ddl.append("(" + "\n");

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ebd9dfc..dce4149 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -216,12 +216,11 @@ abstract public class KylinConfigBase implements Serializable {
         String metadataUrl = getMetadataUrl();
         String defaultPrefix = "kylin_metadata";
 
-        if (metadataUrl.endsWith("@hbase")) {
-            int cut = metadataUrl.lastIndexOf('@');
+        int cut = metadataUrl.lastIndexOf('@');
+        if (cut > 0) {
             return metadataUrl.substring(0, cut);
-        } else {
-            return defaultPrefix;
         }
+        return defaultPrefix;
     }
 
     public String[] getRealizationProviders() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
index 4d5f130..7e4b444 100644
--- a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
@@ -31,7 +31,7 @@ import org.junit.Test;
 
 import com.google.common.collect.Maps;
 
-public class KylinConfigTest extends HotLoadKylinPropertiesTestCase{
+public class KylinConfigTest extends HotLoadKylinPropertiesTestCase {
     @Test
     public void testMRConfigOverride() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
@@ -81,4 +81,19 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase{
 
         assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
     }
+
+    @Test
+    public void testGetMetadataUrlPrefix() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final String default_metadata_prefix = "kylin_metadata";
+        
+        config.setMetadataUrl("testMetaPrefix@hbase");
+        assertEquals("testMetaPrefix", config.getMetadataUrlPrefix());
+
+        config.setMetadataUrl("testMetaPrefix@hdfs");
+        assertEquals("testMetaPrefix", config.getMetadataUrlPrefix());
+
+        config.setMetadataUrl("/kylin/temp");
+        assertEquals(default_metadata_prefix, config.getMetadataUrlPrefix());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
index ccf6e24..3c29d9c 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
@@ -30,6 +30,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.util.UUID;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -114,8 +115,10 @@ public class CachedTreeMapTest {
         }
     }
 
-    public static final String baseDir = "/tmp/kylin_cachedtreemap_test/";
-    public static final String workingDir = "/tmp/kylin_cachedtreemap_test/working";
+
+    static final UUID uuid = UUID.randomUUID();
+    static final String baseDir = "/tmp/kylin_cachedtreemap_test/" + uuid;
+    static final String workingDir = baseDir + "/working";
 
     private static void cleanup() {
         Path basePath = new Path(baseDir);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 2f37a50..2d79970 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -20,9 +20,7 @@ package org.apache.kylin.job;
 
 import java.io.File;
 import java.nio.charset.Charset;
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
+import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -31,8 +29,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -40,15 +36,13 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
 
 public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
@@ -62,8 +56,8 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
     static File localMetaDir;
 
     static final String SEGMENT_ID = "segmentId";
-    static final String segmentId1 = "segmentId1";
-    static final String segmentId2 = "segmentId2";
+    static final String segmentId1 = "seg1" + UUID.randomUUID();
+    static final String segmentId2 = "seg2" + UUID.randomUUID();
     static final String serverName1 = "serverName1";
     static final String serverName2 = "serverName2";
     static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties";
@@ -177,7 +171,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
     }
 
     private static void initZk() {
-        String zkConnectString = getZKConnectString();
+        String zkConnectString = ZookeeperUtil.getZKConnectString();
         if (StringUtils.isEmpty(zkConnectString)) {
             throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
         }
@@ -186,19 +180,6 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
         zkClient.start();
     }
 
-    private static String getZKConnectString() {
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
-        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
-            @Nullable
-            @Override
-            public String apply(String input) {
-                return input + ":" + port;
-            }
-        }), ",");
-    }
-
     String getServerName(String cubeName) {
         String lockPath = getLockPath(cubeName);
         String serverName = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 53c89cf..f3b1ec9 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -36,6 +36,10 @@ import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
@@ -63,6 +67,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
 import org.apache.kylin.tool.StorageCleanupJob;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -84,6 +89,9 @@ public class BuildCubeWithStream {
 
     private KafkaConfig kafkaConfig;
     private MockKafka kafkaServer;
+    private ZkConnection zkConnection;
+    private final String kafkaZkPath = "/" + UUID.randomUUID().toString();
+
     protected static boolean fastBuildMode = false;
     private boolean generateData = true;
 
@@ -128,8 +136,9 @@ public class BuildCubeWithStream {
 
     private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
         //Start mock Kakfa
-        String zkConnectionStr = "sandbox:2181";
-        ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
+        String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath;
+        System.out.println("zkConnectionStr" + zkConnectionStr);
+        zkConnection = new ZkConnection(zkConnectionStr);
         // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
         kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
         kafkaServer.start();
@@ -287,9 +296,24 @@ public class BuildCubeWithStream {
 
     public void after() {
         kafkaServer.stop();
+        cleanKafkaZkPath(kafkaZkPath);
         DefaultScheduler.destroyInstance();
     }
 
+    private void cleanKafkaZkPath(String path) {
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), retryPolicy);
+        zkClient.start();
+
+        try {
+            zkClient.delete().deletingChildrenIfNeeded().forPath(kafkaZkPath);
+        } catch (Exception e) {
+            logger.warn("Failed to delete zookeeper path: " + path, e);
+        } finally {
+            zkClient.close();
+        }
+    }
+
     protected void waitForJob(String jobId) {
         while (true) {
             AbstractExecutable job = jobService.getJob(jobId);
@@ -327,6 +351,8 @@ public class BuildCubeWithStream {
             buildCubeWithStream.before();
             buildCubeWithStream.build();
             logger.info("Build is done");
+
+            buildCubeWithStream.after();
             buildCubeWithStream.cleanup();
             logger.info("Going to exit");
         } catch (Throwable e) {
@@ -336,7 +362,7 @@ public class BuildCubeWithStream {
 
         long millis = System.currentTimeMillis() - start;
         System.out.println("Time elapsed: " + (millis / 1000) + " sec - in " + BuildCubeWithStream.class.getName());
-        
+
         System.exit(exitCode);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
index 3f47923..fce422a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
@@ -29,6 +29,8 @@ import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import kafka.admin.AdminUtils;
 import kafka.server.KafkaConfig;
@@ -52,6 +54,7 @@ public class MockKafka {
     }
 
     private KafkaServerStartable kafkaServer;
+    private static final Logger logger = LoggerFactory.getLogger(MockKafka.class);
 
     private ZkConnection zkConnection;
 
@@ -67,7 +70,7 @@ public class MockKafka {
 
     public MockKafka(ZkConnection zkServerConnection, int port, int brokerId) {
         this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(port), String.valueOf(brokerId));
-        start();
+        //start();
     }
 
     private MockKafka(ZkConnection zkServerConnection, String logDir, String port, String brokerId) {
@@ -110,13 +113,9 @@ public class MockKafka {
         zkClient.close();
     }
 
-    public String getConnectionString() {
-        return String.format("%s:%d", kafkaServer.serverConfig().hostName(), kafkaServer.serverConfig().port());
-    }
-
     public void start() {
         kafkaServer.startup();
-        System.out.println("embedded kafka is up");
+        System.out.println("--embedded kafka is up");
     }
 
     public void stop() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index ee7cd50..983bfd9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -19,11 +19,8 @@
 package org.apache.kylin.storage.hbase.util;
 
 import java.nio.charset.Charset;
-import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
 
-import javax.annotation.Nullable;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -33,18 +30,12 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.lock.DistributedJobLock;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
 /**
  * the jobLock is specially used to support distributed scheduler.
  */
@@ -65,7 +56,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
     public ZookeeperDistributedJobLock(KylinConfig config) {
         this.config = config;
 
-        String zkConnectString = getZKConnectString();
+        String zkConnectString = ZookeeperUtil.getZKConnectString();
         logger.info("zk connection string:" + zkConnectString);
         if (StringUtils.isEmpty(zkConnectString)) {
             throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
@@ -243,19 +234,6 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
         }
     }
 
-    private static String getZKConnectString() {
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
-        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
-            @Nullable
-            @Override
-            public String apply(String input) {
-                return input + ":" + port;
-            }
-        }), ",");
-    }
-
     private String getLockPath(String pathName) {
         return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
new file mode 100644
index 0000000..b5ebe89
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+public class ZookeeperUtil {
+
+    /**
+     * Get zookeeper connection string from HBase Configuration
+     *
+     * @return Zookeeper Connection string
+     */
+    public static String getZKConnectString() {
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+}