You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/02/09 14:40:33 UTC
kylin git commit: fix some testcases which can not be run concurrently
Repository: kylin
Updated Branches:
refs/heads/master 3ee49467c -> f2e8b690f
fix some testcases which can not be run concurrently
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f2e8b690
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f2e8b690
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f2e8b690
Branch: refs/heads/master
Commit: f2e8b690f186abe5048dee8a6b0339c2a28c0594
Parents: 3ee4946
Author: etherge <et...@163.com>
Authored: Wed Feb 8 23:50:35 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Feb 9 22:05:11 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 16 +++---
.../apache/kylin/common/KylinConfigBase.java | 7 ++-
.../apache/kylin/common/KylinConfigTest.java | 17 ++++++-
.../apache/kylin/dict/CachedTreeMapTest.java | 7 ++-
.../kylin/job/BaseTestDistributedScheduler.java | 29 ++---------
.../kylin/provision/BuildCubeWithStream.java | 32 ++++++++++--
.../org/apache/kylin/provision/MockKafka.java | 11 ++---
.../hbase/util/ZookeeperDistributedJobLock.java | 24 +--------
.../kylin/storage/hbase/util/ZookeeperUtil.java | 52 ++++++++++++++++++++
9 files changed, 124 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index e8c7fae..fdcd52c 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -56,6 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import com.google.common.io.Files;
public class DeployUtil {
private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
@@ -139,7 +140,7 @@ public class DeployUtil {
boolean buildCubeUsingProvidedData = Boolean.parseBoolean(System.getProperty("buildCubeUsingProvidedData"));
if (!buildCubeUsingProvidedData) {
System.out.println("build cube with random dataset");
-
+
// data is generated according to cube descriptor and saved in resource store
MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
DataModelDesc model = mgr.getDataModelDesc(modelName);
@@ -208,12 +209,12 @@ public class DeployUtil {
MetadataManager metaMgr = MetadataManager.getInstance(config());
// scp data files, use the data from hbase, instead of local files
- File temp = File.createTempFile("temp", ".csv");
- temp.createNewFile();
+ File tempDir = Files.createTempDir();
+ String tempDirAbsPath = tempDir.getAbsolutePath();
for (String tablename : TABLE_NAMES) {
tablename = tablename.toUpperCase();
- File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
+ File localBufferFile = new File(tempDirAbsPath + "/" + tablename + ".csv");
localBufferFile.createNewFile();
InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream;
@@ -225,8 +226,7 @@ public class DeployUtil {
localBufferFile.deleteOnExit();
}
- String tableFileDir = temp.getParent();
- temp.delete();
+ tempDir.deleteOnExit();
IHiveClient hiveClient = HiveClientFactory.getHiveClient();
// create hive tables
@@ -238,7 +238,7 @@ public class DeployUtil {
// load data to hive tables
// LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
for (String tablename : TABLE_NAMES) {
- hiveClient.executeHQL(generateLoadDataHql(tablename.toUpperCase(), tableFileDir));
+ hiveClient.executeHQL(generateLoadDataHql(tablename.toUpperCase(), tempDirAbsPath));
}
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
@@ -255,7 +255,7 @@ public class DeployUtil {
String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
String dropsql2 = "DROP VIEW IF EXISTS " + tableDesc.getIdentity();
-
+
StringBuilder ddl = new StringBuilder();
ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
ddl.append("(" + "\n");
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ebd9dfc..dce4149 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -216,12 +216,11 @@ abstract public class KylinConfigBase implements Serializable {
String metadataUrl = getMetadataUrl();
String defaultPrefix = "kylin_metadata";
- if (metadataUrl.endsWith("@hbase")) {
- int cut = metadataUrl.lastIndexOf('@');
+ int cut = metadataUrl.lastIndexOf('@');
+ if (cut > 0) {
return metadataUrl.substring(0, cut);
- } else {
- return defaultPrefix;
}
+ return defaultPrefix;
}
public String[] getRealizationProviders() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
index 4d5f130..7e4b444 100644
--- a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
@@ -31,7 +31,7 @@ import org.junit.Test;
import com.google.common.collect.Maps;
-public class KylinConfigTest extends HotLoadKylinPropertiesTestCase{
+public class KylinConfigTest extends HotLoadKylinPropertiesTestCase {
@Test
public void testMRConfigOverride() {
KylinConfig config = KylinConfig.getInstanceFromEnv();
@@ -81,4 +81,19 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase{
assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
}
+
+ @Test
+ public void testGetMetadataUrlPrefix() {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ final String default_metadata_prefix = "kylin_metadata";
+
+ config.setMetadataUrl("testMetaPrefix@hbase");
+ assertEquals("testMetaPrefix", config.getMetadataUrlPrefix());
+
+ config.setMetadataUrl("testMetaPrefix@hdfs");
+ assertEquals("testMetaPrefix", config.getMetadataUrlPrefix());
+
+ config.setMetadataUrl("/kylin/temp");
+ assertEquals(default_metadata_prefix, config.getMetadataUrlPrefix());
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
index ccf6e24..3c29d9c 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
@@ -30,6 +30,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
+import java.util.UUID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -114,8 +115,10 @@ public class CachedTreeMapTest {
}
}
- public static final String baseDir = "/tmp/kylin_cachedtreemap_test/";
- public static final String workingDir = "/tmp/kylin_cachedtreemap_test/working";
+
+ static final UUID uuid = UUID.randomUUID();
+ static final String baseDir = "/tmp/kylin_cachedtreemap_test/" + uuid;
+ static final String workingDir = baseDir + "/working";
private static void cleanup() {
Path basePath = new Path(baseDir);
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 2f37a50..2d79970 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -20,9 +20,7 @@ package org.apache.kylin.job;
import java.io.File;
import java.nio.charset.Charset;
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
+import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
@@ -31,8 +29,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -40,15 +36,13 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
import com.google.common.io.Files;
public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
@@ -62,8 +56,8 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
static File localMetaDir;
static final String SEGMENT_ID = "segmentId";
- static final String segmentId1 = "segmentId1";
- static final String segmentId2 = "segmentId2";
+ static final String segmentId1 = "seg1" + UUID.randomUUID();
+ static final String segmentId2 = "seg2" + UUID.randomUUID();
static final String serverName1 = "serverName1";
static final String serverName2 = "serverName2";
static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties";
@@ -177,7 +171,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
private static void initZk() {
- String zkConnectString = getZKConnectString();
+ String zkConnectString = ZookeeperUtil.getZKConnectString();
if (StringUtils.isEmpty(zkConnectString)) {
throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
}
@@ -186,19 +180,6 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
zkClient.start();
}
- private static String getZKConnectString() {
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
- final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
- @Nullable
- @Override
- public String apply(String input) {
- return input + ":" + port;
- }
- }), ",");
- }
-
String getServerName(String cubeName) {
String lockPath = getLockPath(cubeName);
String serverName = null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 53c89cf..f3b1ec9 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -36,6 +36,10 @@ import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
@@ -63,6 +67,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
import org.apache.kylin.tool.StorageCleanupJob;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -84,6 +89,9 @@ public class BuildCubeWithStream {
private KafkaConfig kafkaConfig;
private MockKafka kafkaServer;
+ private ZkConnection zkConnection;
+ private final String kafkaZkPath = "/" + UUID.randomUUID().toString();
+
protected static boolean fastBuildMode = false;
private boolean generateData = true;
@@ -128,8 +136,9 @@ public class BuildCubeWithStream {
private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
//Start mock Kakfa
- String zkConnectionStr = "sandbox:2181";
- ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
+ String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath;
+ System.out.println("zkConnectionStr" + zkConnectionStr);
+ zkConnection = new ZkConnection(zkConnectionStr);
// Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
kafkaServer.start();
@@ -287,9 +296,24 @@ public class BuildCubeWithStream {
public void after() {
kafkaServer.stop();
+ cleanKafkaZkPath(kafkaZkPath);
DefaultScheduler.destroyInstance();
}
+ private void cleanKafkaZkPath(String path) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), retryPolicy);
+ zkClient.start();
+
+ try {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(kafkaZkPath);
+ } catch (Exception e) {
+ logger.warn("Failed to delete zookeeper path: " + path, e);
+ } finally {
+ zkClient.close();
+ }
+ }
+
protected void waitForJob(String jobId) {
while (true) {
AbstractExecutable job = jobService.getJob(jobId);
@@ -327,6 +351,8 @@ public class BuildCubeWithStream {
buildCubeWithStream.before();
buildCubeWithStream.build();
logger.info("Build is done");
+
+ buildCubeWithStream.after();
buildCubeWithStream.cleanup();
logger.info("Going to exit");
} catch (Throwable e) {
@@ -336,7 +362,7 @@ public class BuildCubeWithStream {
long millis = System.currentTimeMillis() - start;
System.out.println("Time elapsed: " + (millis / 1000) + " sec - in " + BuildCubeWithStream.class.getName());
-
+
System.exit(exitCode);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
index 3f47923..fce422a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java
@@ -29,6 +29,8 @@ import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
@@ -52,6 +54,7 @@ public class MockKafka {
}
private KafkaServerStartable kafkaServer;
+ private static final Logger logger = LoggerFactory.getLogger(MockKafka.class);
private ZkConnection zkConnection;
@@ -67,7 +70,7 @@ public class MockKafka {
public MockKafka(ZkConnection zkServerConnection, int port, int brokerId) {
this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(port), String.valueOf(brokerId));
- start();
+ //start();
}
private MockKafka(ZkConnection zkServerConnection, String logDir, String port, String brokerId) {
@@ -110,13 +113,9 @@ public class MockKafka {
zkClient.close();
}
- public String getConnectionString() {
- return String.format("%s:%d", kafkaServer.serverConfig().hostName(), kafkaServer.serverConfig().port());
- }
-
public void start() {
kafkaServer.startup();
- System.out.println("embedded kafka is up");
+ System.out.println("--embedded kafka is up");
}
public void stop() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index ee7cd50..983bfd9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -19,11 +19,8 @@
package org.apache.kylin.storage.hbase.util;
import java.nio.charset.Charset;
-import java.util.Arrays;
import java.util.concurrent.ExecutorService;
-import javax.annotation.Nullable;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@@ -33,18 +30,12 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.lock.DistributedJobLock;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
/**
* the jobLock is specially used to support distributed scheduler.
*/
@@ -65,7 +56,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
public ZookeeperDistributedJobLock(KylinConfig config) {
this.config = config;
- String zkConnectString = getZKConnectString();
+ String zkConnectString = ZookeeperUtil.getZKConnectString();
logger.info("zk connection string:" + zkConnectString);
if (StringUtils.isEmpty(zkConnectString)) {
throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
@@ -243,19 +234,6 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
}
}
- private static String getZKConnectString() {
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
- final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
- @Nullable
- @Override
- public String apply(String input) {
- return input + ":" + port;
- }
- }), ",");
- }
-
private String getLockPath(String pathName) {
return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
new file mode 100644
index 0000000..b5ebe89
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+public class ZookeeperUtil {
+
+ /**
+ * Get zookeeper connection string from HBase Configuration
+ *
+ * @return Zookeeper Connection string
+ */
+ public static String getZKConnectString() {
+ Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+ final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+ @Nullable
+ @Override
+ public String apply(String input) {
+ return input + ":" + port;
+ }
+ }), ",");
+ }
+}