You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/13 09:32:19 UTC
[kylin] branch master updated: KYLIN-3865 Centralize the zookeeper
related info
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new a6aa9c8 KYLIN-3865 Centralize the zookeeper related info
a6aa9c8 is described below
commit a6aa9c812aa7cf93d97fcd8eb554a8c289d1f20d
Author: kyotoYaho <nj...@apache.org>
AuthorDate: Mon Mar 11 18:18:14 2019 +0800
KYLIN-3865 Centralize the zookeeper related info
---
.../org/apache/kylin/common/KylinConfigBase.java | 18 +-
.../java/org/apache/kylin/common/util/ZKUtil.java | 217 +++++++++++++++++++++
.../apache/kylin/common/util/ZooKeeperUtil.java | 63 ------
.../kylin/job/impl/curator/CuratorScheduler.java | 40 ++--
.../lock/zookeeper}/ZookeeperDistributedLock.java | 112 +----------
.../job/lock/zookeeper}/ZookeeperJobLock.java | 2 +-
.../impl/curator/CuratorLeaderSelectorTest.java | 2 +-
.../job/impl/curator/CuratorSchedulerTest.java | 23 ++-
.../kylin/job/impl/curator/ExampleServer.java | 11 +-
.../kylin/job/BaseTestDistributedScheduler.java | 24 +--
.../kylin/provision/BuildCubeWithEngine.java | 2 +-
.../kylin/provision/BuildCubeWithStream.java | 13 +-
.../hbase/ITZookeeperDistributedLockTest.java | 2 +-
.../org/apache/kylin/rest/service/JobService.java | 8 +-
.../kylin/storage/hbase/util/ZookeeperUtil.java | 34 ----
15 files changed, 287 insertions(+), 284 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0a2b9ae..3edd458 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -40,7 +40,6 @@ import org.apache.kylin.common.lock.DistributedLockFactory;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.ZooKeeperUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -351,15 +350,7 @@ public abstract class KylinConfigBase implements Serializable {
* A comma separated list of host:port pairs, each corresponding to a ZooKeeper server
*/
public String getZookeeperConnectString() {
- String str = getOptional("kylin.env.zookeeper-connect-string");
- if (str != null)
- return str;
-
- str = ZooKeeperUtil.getZKConnectStringFromHBase();
- if (str != null)
- return str;
-
- throw new RuntimeException("Please set 'kylin.env.zookeeper-connect-string' in kylin.properties");
+ return getOptional("kylin.env.zookeeper-connect-string");
}
public int getZKBaseSleepTimeMs() {
@@ -466,7 +457,7 @@ public abstract class KylinConfigBase implements Serializable {
public DistributedLockFactory getDistributedLockFactory() {
String clsName = getOptional("kylin.metadata.distributed-lock-impl",
- "org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock$Factory");
+ "org.apache.kylin.job.lock.zookeeper.ZookeeperDistributedLock$Factory");
return (DistributedLockFactory) ClassUtil.newInstance(clsName);
}
@@ -1735,7 +1726,10 @@ public abstract class KylinConfigBase implements Serializable {
}
public String getClusterName() {
- return this.getOptional("kylin.server.cluster-name", getMetadataUrlPrefix());
+ String key = "kylin.server.cluster-name";
+ String clusterName = this.getOptional(key, getMetadataUrlPrefix());
+ setProperty(key, clusterName);
+ return clusterName;
}
public String getInitTasks() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
new file mode 100644
index 0000000..8979022
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Iterables;
+
+public class ZKUtil {
+ private static final Logger logger = LoggerFactory.getLogger(ZKUtil.class);
+
+ private static final KylinConfig defaultKylinConfig = KylinConfig.getInstanceFromEnv();
+ private static final String zkChRoot = fixPath(defaultKylinConfig.getZookeeperBasePath(),
+ defaultKylinConfig.getClusterName());
+
+ private static String fixPath(String parent, String child) {
+ String path = ZKPaths.makePath(parent, child);
+
+ try {
+ return new File(path).getCanonicalPath();
+ } catch (IOException e) {
+ logger.error("get canonical path failed, use original path", e);
+ return path;
+ }
+ }
+
+ private static Cache<String, CuratorFramework> CACHE = CacheBuilder.newBuilder()
+ .removalListener(new RemovalListener<String, CuratorFramework>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, CuratorFramework> notification) {
+ logger.info("CuratorFramework for zkString " + notification.getKey() + " is removed due to "
+ + notification.getCause());
+ CuratorFramework curator = notification.getValue();
+ try {
+ curator.close();
+ } catch (Exception ex) {
+ logger.error("Error at closing " + curator, ex);
+ }
+ }
+ }).expireAfterWrite(1, TimeUnit.DAYS).build();
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ logger.info("Going to remove {} cached curator clients", CACHE.size());
+ CACHE.invalidateAll();
+ }
+ }));
+ }
+
+ /**
+ * Get zookeeper connection string from kylin.properties
+ */
+ public static String getZKConnectString(KylinConfig config) {
+ String zkString = config.getZookeeperConnectString();
+ if (zkString == null) {
+ zkString = getZKConnectStringFromHBase();
+ if (zkString == null) {
+ throw new RuntimeException("Please set 'kylin.env.zookeeper-connect-string' in kylin.properties");
+ }
+ }
+
+ return zkString;
+ }
+
+ public static CuratorFramework getZookeeperClient(KylinConfig config) {
+ RetryPolicy retryPolicy = getRetryPolicy(config);
+ return getZookeeperClient(getZKConnectString(config), retryPolicy);
+ }
+
+ private static CuratorFramework getZookeeperClient(final String zkString, final RetryPolicy retryPolicy) {
+ if (StringUtils.isEmpty(zkString)) {
+ throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+ }
+ try {
+ CuratorFramework instance = CACHE.get(zkString, new Callable<CuratorFramework>() {
+ @Override
+ public CuratorFramework call() throws Exception {
+ return newZookeeperClient(zkString, retryPolicy);
+ }
+ });
+ // during test, curator may be closed by others, remove it from CACHE and reinitialize a new one
+ if (instance.getState() != CuratorFrameworkState.STARTED) {
+ logger.warn("curator for {} is closed by others unexpectedly, reinitialize a new one", zkString);
+ CACHE.invalidate(zkString);
+ instance = getZookeeperClient(zkString, retryPolicy);
+ }
+ return instance;
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ //no cache
+ public static CuratorFramework newZookeeperClient() {
+ return newZookeeperClient(KylinConfig.getInstanceFromEnv());
+ }
+
+ @VisibleForTesting
+ //no cache
+ public static CuratorFramework newZookeeperClient(KylinConfig config) {
+ RetryPolicy retryPolicy = getRetryPolicy(config);
+ return newZookeeperClient(getZKConnectString(config), retryPolicy);
+ }
+
+ @VisibleForTesting
+ //no cache
+ public static CuratorFramework newZookeeperClient(String zkString, RetryPolicy retryPolicy) {
+ if (zkChRoot == null)
+ throw new NullPointerException("zkChRoot must not be null");
+
+ logger.info("zookeeper connection string: {} with namespace {}", zkString, zkChRoot);
+
+ CuratorFramework instance = getCuratorFramework(zkString, zkChRoot, retryPolicy);
+ instance.start();
+ logger.info("new zookeeper Client start: " + zkString);
+ // create zkChRoot znode if necessary
+ createZkChRootIfNecessary(instance, zkString);
+ return instance;
+ }
+
+ private static RetryPolicy getRetryPolicy(KylinConfig config) {
+ int baseSleepTimeMs = config.getZKBaseSleepTimeMs();
+ int maxRetries = config.getZKMaxRetries();
+ return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
+ }
+
+ private static synchronized void createZkChRootIfNecessary(CuratorFramework instance, String zkString) {
+ try {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ if (instance.checkExists().forPath("/") == null) {
+ CuratorFramework tmpCurator = getCuratorFramework(zkString, null, retryPolicy);
+ tmpCurator.start();
+ tmpCurator.create().creatingParentsIfNeeded().forPath(zkChRoot);
+ tmpCurator.close();
+ }
+ } catch (KeeperException.NodeExistsException e) {
+ logger.warn("The chRoot znode {} has been created by others", zkChRoot);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to check or create znode for chRoot " + zkChRoot + " due to ", e);
+ }
+ }
+
+ private static CuratorFramework getCuratorFramework(String zkString, String zkChRoot, RetryPolicy retryPolicy) {
+ if (!Strings.isNullOrEmpty(zkChRoot)) {
+ zkString += zkChRoot;
+ }
+ return CuratorFrameworkFactory.newClient(zkString, 120000, 15000, retryPolicy);
+ }
+
+ private static String getZKConnectStringFromHBase() {
+ Configuration hconf = null;
+ try {
+ Class<? extends Object> hbaseConnClz = ClassUtil.forName("org.apache.kylin.storage.hbase.HBaseConnection",
+ Object.class);
+ hconf = (Configuration) hbaseConnClz.getMethod("getCurrentHBaseConfiguration").invoke(null);
+ } catch (Throwable ex) {
+ logger.warn("Failed to get zookeeper connect string from HBase configuration", ex);
+ return null;
+ }
+
+ final String serverList = hconf.get("hbase.zookeeper.quorum");
+ final String port = hconf.get("hbase.zookeeper.property.clientPort");
+ return StringUtils
+ .join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+ @Nullable
+ @Override
+ public String apply(String input) {
+ return input + ":" + port;
+ }
+ }), ",");
+ }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ZooKeeperUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ZooKeeperUtil.java
deleted file mode 100644
index 48a70b9..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/util/ZooKeeperUtil.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
-/**
- * Use reflection to get zookeeper connect string from HBase configuration.
- */
-public class ZooKeeperUtil {
- private static final Logger logger = LoggerFactory.getLogger(ZooKeeperUtil.class);
-
- private ZooKeeperUtil() {
- throw new IllegalStateException("Class ZooKeeperUtil is an utility class !");
- }
-
- public static String getZKConnectStringFromHBase() {
- Configuration hconf = null;
- try {
- Class<? extends Object> hbaseConnClz = ClassUtil.forName("org.apache.kylin.storage.hbase.HBaseConnection", Object.class);
- hconf = (Configuration) hbaseConnClz.getMethod("getCurrentHBaseConfiguration").invoke(null);
- } catch (Throwable ex) {
- logger.warn("Failed to get zookeeper connect string from HBase configuration", ex);
- return null;
- }
-
- final String serverList = hconf.get("hbase.zookeeper.quorum");
- final String port = hconf.get("hbase.zookeeper.property.clientPort");
- return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
- @Nullable
- @Override
- public String apply(String input) {
- return input + ":" + port;
- }
- }), ",");
- }
-}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
index 36be923..ff6e561 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,9 +31,7 @@ import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -44,6 +41,7 @@ import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.ServerMode;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
@@ -55,6 +53,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
@@ -69,33 +68,36 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
private KylinConfig kylinConfig;
private AtomicInteger count = new AtomicInteger();
- static final String JOB_ENGINE_LEADER_PATH = "/kylin/%s/job_engine/leader";
- static final String KYLIN_SERVICE_PATH = "/kylin/%s/service";
+ static final String JOB_ENGINE_LEADER_PATH = "/job_engine/leader";
+ static final String KYLIN_SERVICE_PATH = "/service";
static final String SERVICE_NAME = "kylin";
static final String SERVICE_PAYLOAD_DESCRIPTION = "description";
+ // the default constructor should exist for reflection initialization
public CuratorScheduler() {
}
+ @VisibleForTesting
+ CuratorScheduler(CuratorFramework curatorClient) {
+ this.curatorClient = curatorClient;
+ }
+
@Override
public void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
kylinConfig = jobEngineConfig.getConfig();
- String zkAddress = kylinConfig.getZookeeperConnectString();
-
synchronized (this) {
if (started) {
logger.info("CuratorScheduler already started, skipped.");
return;
}
- int baseSleepTimeMs = kylinConfig.getZKBaseSleepTimeMs();
- int maxRetries = kylinConfig.getZKMaxRetries();
- curatorClient = CuratorFrameworkFactory.newClient(zkAddress,
- new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries));
- logger.info("New ZK Client start: ", zkAddress);
- curatorClient.start();
+ // curatorClient can be assigned before only for test cases
+ // due to creating independent curator client rather than share a cached one to avoid influences
+ if (curatorClient == null) {
+ curatorClient = ZKUtil.getZookeeperClient(kylinConfig);
+ }
final String restAddress = kylinConfig.getServerRestAddress();
try {
@@ -104,7 +106,7 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
throw new SchedulerException(e);
}
- String jobEnginePath = getJobEnginePath(slickMetadataPrefix(kylinConfig.getMetadataUrlPrefix()));
+ String jobEnginePath = JOB_ENGINE_LEADER_PATH;
if (ServerMode.isJob(jobEngineConfig.getConfig())) {
jobClient = new CuratorLeaderSelector(curatorClient, jobEnginePath, restAddress, jobEngineConfig);
@@ -127,7 +129,7 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
final String port = restAddress.substring(restAddress.indexOf(":") + 1);
final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class);
- final String servicePath = getServicePath(slickMetadataPrefix(kylinConfig.getMetadataUrlPrefix()));
+ final String servicePath = KYLIN_SERVICE_PATH;
serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient)
.basePath(servicePath).serializer(serializer).build();
serviceDiscovery.start();
@@ -179,14 +181,6 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
serviceDiscovery.registerService(thisInstance);
}
- static String getJobEnginePath(String metadataUrlPrefix) {
- return String.format(Locale.ROOT, JOB_ENGINE_LEADER_PATH, metadataUrlPrefix);
- }
-
- static String getServicePath(String metadataUrlPrefix) {
- return String.format(Locale.ROOT, KYLIN_SERVICE_PATH, metadataUrlPrefix);
- }
-
private void monitorJobEngine() {
logger.info("Start collect monitor ZK Participants");
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
similarity index 66%
rename from storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
rename to core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
index fffffd7..07b06db 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperDistributedLock.java
@@ -16,31 +16,24 @@
* limitations under the License.
*/
-package org.apache.kylin.storage.hbase.util;
+package org.apache.kylin.job.lock.zookeeper;
import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.lock.DistributedLockFactory;
+import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.lock.JobLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,51 +47,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
private static final Random random = new Random();
public static class Factory extends DistributedLockFactory {
-
- private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- for (CuratorFramework curator : CACHE.values()) {
- try {
- curator.close();
- } catch (Exception ex) {
- logger.error("Error at closing " + curator, ex);
- }
- }
- }
- }));
- }
-
- private static CuratorFramework getZKClient(KylinConfig config) {
- CuratorFramework zkClient = CACHE.get(config);
- if (zkClient == null) {
- synchronized (ZookeeperDistributedLock.class) {
- zkClient = CACHE.get(config);
- if (zkClient == null) {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- String zkConnectString = getZKConnectString(config);
- ZookeeperAclBuilder zookeeperAclBuilder = new ZookeeperAclBuilder().invoke();
- zkClient = zookeeperAclBuilder.setZKAclBuilder(CuratorFrameworkFactory.builder()).connectString(zkConnectString).sessionTimeoutMs(120000).connectionTimeoutMs(15000).retryPolicy(retryPolicy).build();
- zkClient.start();
- CACHE.put(config, zkClient);
- if (CACHE.size() > 1) {
- logger.warn("More than one singleton exist");
- }
- }
- }
- }
- return zkClient;
- }
-
- private static String getZKConnectString(KylinConfig config) {
- // the ZKConnectString should come from KylinConfig, however it is taken from HBase configuration at the moment
- return ZookeeperUtil.getZKConnectString();
- }
-
- final String zkPathBase;
final CuratorFramework curator;
public Factory() {
@@ -106,31 +54,30 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
}
public Factory(KylinConfig config) {
- this.curator = getZKClient(config);
- this.zkPathBase = fixSlash(config.getZookeeperBasePath() + "/" + config.getMetadataUrlPrefix());
+ this(ZKUtil.getZookeeperClient(config));
+ }
+
+ public Factory(CuratorFramework curator) {
+ this.curator = curator;
}
@Override
public DistributedLock lockForClient(String client) {
- return new ZookeeperDistributedLock(curator, zkPathBase, client);
+ return new ZookeeperDistributedLock(curator, client);
}
}
// ============================================================================
final CuratorFramework curator;
- final String zkPathBase;
final String client;
final byte[] clientBytes;
- private ZookeeperDistributedLock(CuratorFramework curator, String zkPathBase, String client) {
+ private ZookeeperDistributedLock(CuratorFramework curator, String client) {
if (client == null)
throw new NullPointerException("client must not be null");
- if (zkPathBase == null)
- throw new NullPointerException("zkPathBase must not be null");
this.curator = curator;
- this.zkPathBase = zkPathBase;
this.client = client;
this.clientBytes = client.getBytes(StandardCharsets.UTF_8);
}
@@ -142,8 +89,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
@Override
public boolean lock(String lockPath) {
- lockPath = norm(lockPath);
-
logger.debug("{} trying to lock {}", client, lockPath);
try {
@@ -166,8 +111,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
@Override
public boolean lock(String lockPath, long timeout) {
- lockPath = norm(lockPath);
-
if (lock(lockPath))
return true;
@@ -198,8 +141,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
@Override
public String peekLock(String lockPath) {
- lockPath = norm(lockPath);
-
try {
byte[] bytes = curator.getData().forPath(lockPath);
return new String(bytes, StandardCharsets.UTF_8);
@@ -222,8 +163,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
@Override
public void unlock(String lockPath) {
- lockPath = norm(lockPath);
-
logger.debug("{} trying to unlock {}", client, lockPath);
String owner = peekLock(lockPath);
@@ -244,8 +183,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
@Override
public void purgeLocks(String lockPathRoot) {
- lockPathRoot = norm(lockPathRoot);
-
try {
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPathRoot);
@@ -258,8 +195,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
@Override
public Closeable watchLocks(String lockPathRoot, Executor executor, final Watcher watcher) {
- lockPathRoot = norm(lockPathRoot);
-
PathChildrenCache cache = new PathChildrenCache(curator, lockPathRoot, true);
try {
cache.start();
@@ -284,35 +219,6 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock {
return cache;
}
- // normalize lock path
- private String norm(String lockPath) {
- if (!lockPath.startsWith(zkPathBase))
- lockPath = zkPathBase + (lockPath.startsWith("/") ? "" : "/") + lockPath;
- return fixSlash(lockPath);
- }
-
- private static String fixSlash(String path) {
- if (!path.startsWith("/"))
- path = "/" + path;
- if (path.endsWith("/"))
- path = path.substring(0, path.length() - 1);
- for (int n = Integer.MAX_VALUE; n > path.length();) {
- n = path.length();
- path = path.replace("//", "/");
- }
-
- if (Shell.WINDOWS) {
- return new File(path).toURI().getPath();
- } else {
- try {
- return new File(path).getCanonicalPath();
- } catch (IOException e) {
- logger.error("get canonical path failed, use original path", e);
- return path;
- }
- }
- }
-
// ============================================================================
@Override
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperJobLock.java
similarity index 98%
rename from storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
rename to core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperJobLock.java
index 6e7890b..fcb4f9c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/zookeeper/ZookeeperJobLock.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.kylin.storage.hbase.util;
+package org.apache.kylin.job.lock.zookeeper;
import java.io.Closeable;
import java.util.concurrent.Executor;
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelectorTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelectorTest.java
index 45c0129..e8ba0b2 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelectorTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelectorTest.java
@@ -52,7 +52,7 @@ public class CuratorLeaderSelectorTest extends LocalFileMetadataTestCase {
final String zkString = zkTestServer.getConnectString();
final String server1 = "server1:1111";
final String server2 = "server2:2222";
- String jobEnginePath = CuratorScheduler.getJobEnginePath(CuratorScheduler.slickMetadataPrefix(kylinConfig.getMetadataUrlPrefix()));
+ String jobEnginePath = CuratorScheduler.JOB_ENGINE_LEADER_PATH;
CuratorFramework client = CuratorFrameworkFactory.newClient(zkString, new ExponentialBackoffRetry(3000, 3));
client.start();
CuratorLeaderSelector s1 = new CuratorLeaderSelector(client //
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
index dbe5bba..d757ccd 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
@@ -17,18 +17,21 @@
*/
package org.apache.kylin.job.impl.curator;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.execution.ExecutableManager;
import org.junit.After;
import org.junit.Assert;
@@ -37,10 +40,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.List;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
/**
*/
@@ -75,7 +76,6 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
public void test() throws Exception {
final String zkString = zkTestServer.getConnectString();
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
ServiceDiscovery<LinkedHashMap> serviceDiscovery = null;
CuratorFramework curatorClient = null;
@@ -83,9 +83,8 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
final CuratorScheduler.JsonInstanceSerializer<LinkedHashMap> serializer = new CuratorScheduler.JsonInstanceSerializer<>(
LinkedHashMap.class);
- String servicePath = CuratorScheduler.getServicePath(CuratorScheduler.slickMetadataPrefix(kylinConfig.getMetadataUrlPrefix()));
- curatorClient = CuratorFrameworkFactory.newClient(zkString, new ExponentialBackoffRetry(3000, 3));
- curatorClient.start();
+ String servicePath = CuratorScheduler.KYLIN_SERVICE_PATH;
+ curatorClient = ZKUtil.newZookeeperClient(zkString, new ExponentialBackoffRetry(3000, 3));
serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient)
.basePath(servicePath).serializer(serializer).build();
serviceDiscovery.start();
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java b/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
index a6df81e..66e3832 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
@@ -18,14 +18,16 @@
package org.apache.kylin.job.impl.curator;
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.lock.MockJobLock;
-import java.io.Closeable;
-import java.io.IOException;
-
/**
*/
public class ExampleServer implements Closeable {
@@ -40,7 +42,8 @@ public class ExampleServer implements Closeable {
KylinConfig kylinConfig1 = KylinConfig.createKylinConfig(kylinConfig);
kylinConfig1.setProperty("kylin.server.host-address", address);
- scheduler = new CuratorScheduler();
+ CuratorFramework client = ZKUtil.newZookeeperClient(kylinConfig1);
+ scheduler = new CuratorScheduler(client);
scheduler.init(new JobEngineConfig(kylinConfig1), new MockJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 5a2091d..6c6a36e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -22,22 +22,18 @@ import java.io.File;
import java.nio.charset.Charset;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
-import org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock;
-import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperDistributedLock;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
@@ -111,7 +107,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
@AfterClass
public static void after() throws Exception {
- jobLock1.purgeLocks("");
+ jobLock1.purgeLocks(DistributedScheduler.ZOOKEEPER_LOCK_PATH);
if (scheduler1 != null) {
scheduler1.shutdown();
@@ -167,17 +163,11 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
private static void initZk() {
- String zkConnectString = ZookeeperUtil.getZKConnectString();
- if (StringUtils.isEmpty(zkConnectString)) {
- throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
- }
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
- zkClient.start();
+ zkClient = ZKUtil.newZookeeperClient();
}
String getServerName(String segName) {
- String lockPath = getFullLockPath(segName);
+ String lockPath = DistributedScheduler.getLockPath(segName);
String serverName = null;
if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
try {
@@ -191,8 +181,4 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
}
return serverName;
}
-
- private String getFullLockPath(String segName) {
- return DistributedScheduler.dropDoubleSlash("/kylin/" + kylinConfig1.getMetadataUrlPrefix() + DistributedScheduler.getLockPath(segName));
- }
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index ec5bc35..bd3e7c5 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -70,7 +70,7 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 2153821..5b6f26a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -36,15 +36,13 @@ import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -56,6 +54,7 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -68,8 +67,6 @@ import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,7 +136,7 @@ public class BuildCubeWithStream {
private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
//Start mock Kakfa
- String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath;
+ String zkConnectionStr = ZKUtil.getZKConnectString(KylinConfig.getInstanceFromEnv()) + kafkaZkPath;
System.out.println("zkConnectionStr" + zkConnectionStr);
zkConnection = new ZkConnection(zkConnectionStr);
// Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
@@ -333,9 +330,7 @@ public class BuildCubeWithStream {
}
private void cleanKafkaZkPath(String path) {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), retryPolicy);
- zkClient.start();
+ CuratorFramework zkClient = ZKUtil.newZookeeperClient();
try {
zkClient.delete().deletingChildrenIfNeeded().forPath(kafkaZkPath);
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java
index 48d6736..e369ff0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.lock.DistributedLock.Watcher;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperDistributedLock;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4f15d9e..31a4119 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -76,7 +76,7 @@ import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.SourcePartition;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -120,6 +120,12 @@ public class JobService extends BasicService implements InitializingBean {
TimeZone.setDefault(tzone);
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+ // In case of that kylin.server.cluster-name is not set,
+ // this method have to be called first to avoid the influence of the change of kylin.metadata.url
+ String clusterName = kylinConfig.getClusterName();
+ logger.info("starting to initialize an instance in cluster {}", clusterName);
+
final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory
.scheduler(kylinConfig.getSchedulerType());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
deleted file mode 100644
index 20569d3..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.util;
-
-import org.apache.kylin.common.KylinConfig;
-
-public class ZookeeperUtil {
-
- public static String ZOOKEEPER_UTIL_HBASE_CLASSNAME = "org.apache.kylin.storage.hbase.util.ZooKeeperUtilHbase";
-
- /**
- * Get zookeeper connection string from HBase Configuration or from kylin.properties
- */
- public static String getZKConnectString() {
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- return config.getZookeeperConnectString();
- }
-}
\ No newline at end of file