You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 22:34:46 UTC
[5/6] kylin git commit: KYLIN-2006 refactor test case,
avoid conflict with default CI metadata
KYLIN-2006 refactor test case, avoid conflict with default CI metadata
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b1e81d4e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b1e81d4e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b1e81d4e
Branch: refs/heads/master
Commit: b1e81d4e261a5e9367390e63f8148468fac9e001
Parents: 85c4ded
Author: Yang Li <li...@apache.org>
Authored: Tue Nov 8 22:30:23 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 9 06:32:02 2016 +0800
----------------------------------------------------------------------
.../common/util/AbstractKylinTestCase.java | 4 +-
.../kylin/job/execution/AbstractExecutable.java | 4 +
.../kylin/job/execution/ExecutableManager.java | 4 +
.../impl/threadpool/DistributedScheduler.java | 6 +-
.../kylin/job/BaseTestDistributedScheduler.java | 121 +++++++++----------
.../apache/kylin/job/ContextTestExecutable.java | 9 +-
.../job/ITDistributedSchedulerBaseTest.java | 22 ++--
.../job/ITDistributedSchedulerTakeOverTest.java | 10 +-
.../hbase/util/ZookeeperDistributedJobLock.java | 42 ++++---
9 files changed, 115 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
index 14bf90b..2154c32 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
@@ -34,7 +34,9 @@ public abstract class AbstractKylinTestCase {
"org.apache.kylin.storage.hybrid.HybridManager", //
"org.apache.kylin.metadata.realization.RealizationRegistry", //
"org.apache.kylin.metadata.project.ProjectManager", //
- "org.apache.kylin.metadata.MetadataManager" //
+ "org.apache.kylin.metadata.MetadataManager", //
+ "org.apache.kylin.job.impl.threadpool.DistributedScheduler", //
+ "org.apache.kylin.job.manager.ExecutableManager", //
};
public abstract void createTestMetadata() throws Exception;
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 2a4b2df..9292418 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -66,6 +66,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
this.config = config;
}
+ protected KylinConfig getConfig() {
+ return config;
+ }
+
protected ExecutableManager getManager() {
return ExecutableManager.getInstance(config);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 92fc8c9..1db612f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -65,6 +65,10 @@ public class ExecutableManager {
return r;
}
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
private ExecutableManager(KylinConfig config) {
logger.info("Using metadata url: " + config);
this.config = config;
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 17df119..3937a24 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -43,11 +43,11 @@ import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.DistributedJobLock;
import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +98,10 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
return r;
}
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
private class FetcherRunner implements Runnable {
@Override
synchronized public void run() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index c33f3da..910db49 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -18,9 +18,13 @@
package org.apache.kylin.job;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.StringUtils;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -29,12 +33,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
import org.junit.AfterClass;
@@ -42,14 +46,11 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.util.Arrays;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
-public class BaseTestDistributedScheduler {
- static ExecutableManager jobService;
+public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
+ static ExecutableManager execMgr;
static ZookeeperDistributedJobLock jobLock;
static DistributedScheduler scheduler1;
static DistributedScheduler scheduler2;
@@ -62,35 +63,38 @@ public class BaseTestDistributedScheduler {
static final String segmentId2 = "segmentId2";
static final String serverName1 = "serverName1";
static final String serverName2 = "serverName2";
- static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
static final String confSrcPath = "../examples/test_case_data/sandbox/kylin.properties";
- static final String confDstPath = "../examples/kylin.properties";
- static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
+ static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties";
+ static final String confDstPath2 = "target/kylin_metadata_dist_lock_test2/kylin.properties";
private static final Logger logger = LoggerFactory.getLogger(BaseTestDistributedScheduler.class);
- static {
- try {
- ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
@BeforeClass
public static void setup() throws Exception {
- staticCreateTestMetadata(SANDBOX_TEST_DATA);
+ staticCreateTestMetadata();
System.setProperty("kylin.job.controller.lock", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+ new File(confDstPath1).getParentFile().mkdirs();
+ new File(confDstPath2).getParentFile().mkdirs();
+ KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
+ String backup = srcConfig.getMetadataUrl();
+ srcConfig.setProperty("kylin.metadata.url", "kylin_metadata_dist_lock_test@hbase");
+ srcConfig.writeProperties(new File(confDstPath1));
+ srcConfig.writeProperties(new File(confDstPath2));
+ srcConfig.setProperty("kylin.metadata.url", backup);
+ kylinConfig1 = KylinConfig.createInstanceFromUri(new File(confDstPath1).getAbsolutePath());
+ kylinConfig2 = KylinConfig.createInstanceFromUri(new File(confDstPath2).getAbsolutePath());
+
initZk();
- kylinConfig1 = KylinConfig.getInstanceFromEnv();
- jobService = ExecutableManager.getInstance(kylinConfig1);
- for (String jobId : jobService.getAllJobIds()) {
- jobService.deleteJob(jobId);
+ if (jobLock == null)
+ jobLock = new ZookeeperDistributedJobLock(kylinConfig1);
+
+ execMgr = ExecutableManager.getInstance(kylinConfig1);
+ for (String jobId : execMgr.getAllJobIds()) {
+ execMgr.deleteJob(jobId);
}
- jobLock = new ZookeeperDistributedJobLock();
scheduler1 = DistributedScheduler.getInstance(kylinConfig1);
scheduler1.setServerName(serverName1);
scheduler1.init(new JobEngineConfig(kylinConfig1), jobLock);
@@ -98,11 +102,6 @@ public class BaseTestDistributedScheduler {
throw new RuntimeException("scheduler1 not started");
}
- String absoluteConfSrcPath = new File(confSrcPath).getAbsolutePath();
- String absoluteConfDstPath = new File(confDstPath).getAbsolutePath();
- copyFile(absoluteConfSrcPath, absoluteConfDstPath);
- kylinConfig2 = KylinConfig.createInstanceFromUri(absoluteConfDstPath);
-
scheduler2 = DistributedScheduler.getInstance(kylinConfig2);
scheduler2.setServerName(serverName2);
scheduler2.init(new JobEngineConfig(kylinConfig2), jobLock);
@@ -115,22 +114,30 @@ public class BaseTestDistributedScheduler {
@AfterClass
public static void after() throws Exception {
- System.clearProperty(KylinConfig.KYLIN_CONF);
+ if (scheduler1 != null) {
+ scheduler1.shutdown();
+ scheduler1 = null;
+ }
+ if (scheduler2 != null) {
+ scheduler2.shutdown();
+ scheduler2 = null;
+ }
+ if (jobLock != null) {
+ jobLock.close();
+ jobLock = null;
+ }
+ if (zkClient != null) {
+ zkClient.close();
+ zkClient = null;
+ }
+
System.clearProperty("kylin.job.controller.lock");
-
- deleteFile(confDstPath);
- }
-
- private static void staticCreateTestMetadata(String kylinConfigFolder) {
- KylinConfig.destroyInstance();
-
- if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null)
- System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder);
+ staticCleanupTestMetadata();
}
void waitForJobFinish(String jobId) {
while (true) {
- AbstractExecutable job = jobService.getJob(jobId);
+ AbstractExecutable job = execMgr.getJob(jobId);
final ExecutableState status = job.getStatus();
if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
break;
@@ -146,7 +153,7 @@ public class BaseTestDistributedScheduler {
void waitForJobStatus(String jobId, ExecutableState state, long interval) {
while (true) {
- AbstractExecutable job = jobService.getJob(jobId);
+ AbstractExecutable job = execMgr.getJob(jobId);
if (state == job.getStatus()) {
break;
} else {
@@ -177,7 +184,7 @@ public class BaseTestDistributedScheduler {
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+ return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
@Nullable
@Override
public String apply(String input) {
@@ -203,24 +210,6 @@ public class BaseTestDistributedScheduler {
}
private String getLockPath(String pathName) {
- return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
- }
-
- private static void copyFile(String srcPath, String dstPath) {
- try {
- File srcFile = new File(srcPath);
- File dstFile = new File(dstPath);
- Files.copy(srcFile.toPath(), dstFile.toPath());
- } catch (Exception e) {
- logger.error("copy the file failed", e);
- }
- }
-
- private static void deleteFile(String path) {
- try {
- Files.delete(new File(path).toPath());
- } catch (Exception e) {
- logger.error("delete the file failed", e);
- }
+ return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
index 052baad..4696e67 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
@@ -18,12 +18,10 @@
package org.apache.kylin.job;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.impl.threadpool.DefaultContext;
public class ContextTestExecutable extends AbstractExecutable {
public ContextTestExecutable() {
@@ -33,19 +31,14 @@ public class ContextTestExecutable extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- DefaultContext defaultContext = (DefaultContext) context;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
- if (getHashCode(defaultContext.getConfig()) == getHashCode(KylinConfig.getInstanceFromEnv())) {
+ if (context.getConfig() == BaseTestDistributedScheduler.kylinConfig1) {
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} else {
return new ExecuteResult(ExecuteResult.State.ERROR, "error");
}
}
-
- private int getHashCode(KylinConfig config) {
- return System.identityHashCode(config);
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
index 443e73b..0d5e011 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
@@ -42,16 +42,16 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
job.addTask(task1);
job.addTask(task2);
job.addTask(task3);
- jobService.addJob(job);
+ execMgr.addJob(job);
Assert.assertEquals(serverName1, getServerName(segmentId1));
waitForJobFinish(job.getId());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
Assert.assertEquals(null, getServerName(segmentId1));
}
@@ -66,11 +66,11 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
ContextTestExecutable task1 = new ContextTestExecutable();
task1.setParam(SEGMENT_ID, segmentId2);
job.addTask(task1);
- jobService.addJob(job);
+ execMgr.addJob(job);
waitForJobFinish(job.getId());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
if (!lock(jobLock, segmentId2, serverName2)) {
throw new JobException("fail to get the lock");
@@ -81,10 +81,10 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
ContextTestExecutable task2 = new ContextTestExecutable();
task2.setParam(SEGMENT_ID, segmentId2);
job2.addTask(task2);
- jobService.addJob(job2);
+ execMgr.addJob(job2);
waitForJobFinish(job2.getId());
- Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
- Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job2.getId()).getState());
+ Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState());
+ Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job2.getId()).getState());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
index 3137aef..2b15ddd 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
@@ -43,7 +43,7 @@ public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedSched
job.addTask(task1);
job.addTask(task2);
job.addTask(task3);
- jobService.addJob(job);
+ execMgr.addJob(job);
waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
@@ -52,9 +52,9 @@ public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedSched
waitForJobFinish(job.getId());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
- Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index d8d27c5..613d783 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -52,11 +52,19 @@ import com.google.common.collect.Iterables;
public class ZookeeperDistributedJobLock implements DistributedJobLock {
private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
- private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
- private static CuratorFramework zkClient;
- private static PathChildrenCache childrenCache;
+ public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+
+ final private KylinConfig config;
+ final CuratorFramework zkClient;
+ final PathChildrenCache childrenCache;
+
+ public ZookeeperDistributedJobLock() {
+ this(KylinConfig.getInstanceFromEnv());
+ }
+
+ public ZookeeperDistributedJobLock(KylinConfig config) {
+ this.config = config;
- static {
String zkConnectString = getZKConnectString();
logger.info("zk connection string:" + zkConnectString);
if (StringUtils.isEmpty(zkConnectString)) {
@@ -71,12 +79,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- try {
- childrenCache.close();
- zkClient.close();
- } catch (Exception e) {
- logger.error("error occurred to close PathChildrenCache", e);
- }
+ close();
}
}));
}
@@ -200,7 +203,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+ return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
@Nullable
@Override
public String apply(String input) {
@@ -210,11 +213,11 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
}
private String getLockPath(String pathName) {
- return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+ return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName;
}
- private static String getWatchPath() {
- return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+ private String getWatchPath() {
+ return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix();
}
@Override
@@ -226,4 +229,13 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
public void unlock() {
}
+
+ public void close() {
+ try {
+ childrenCache.close();
+ zkClient.close();
+ } catch (Exception e) {
+ logger.error("error occurred to close PathChildrenCache", e);
+ }
+ }
}