You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 22:34:46 UTC

[5/6] kylin git commit: KYLIN-2006 refactor test case, avoid conflict with default CI metadata

KYLIN-2006 refactor test case, avoid conflict with default CI metadata


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b1e81d4e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b1e81d4e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b1e81d4e

Branch: refs/heads/master
Commit: b1e81d4e261a5e9367390e63f8148468fac9e001
Parents: 85c4ded
Author: Yang Li <li...@apache.org>
Authored: Tue Nov 8 22:30:23 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 9 06:32:02 2016 +0800

----------------------------------------------------------------------
 .../common/util/AbstractKylinTestCase.java      |   4 +-
 .../kylin/job/execution/AbstractExecutable.java |   4 +
 .../kylin/job/execution/ExecutableManager.java  |   4 +
 .../impl/threadpool/DistributedScheduler.java   |   6 +-
 .../kylin/job/BaseTestDistributedScheduler.java | 121 +++++++++----------
 .../apache/kylin/job/ContextTestExecutable.java |   9 +-
 .../job/ITDistributedSchedulerBaseTest.java     |  22 ++--
 .../job/ITDistributedSchedulerTakeOverTest.java |  10 +-
 .../hbase/util/ZookeeperDistributedJobLock.java |  42 ++++---
 9 files changed, 115 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
index 14bf90b..2154c32 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
@@ -34,7 +34,9 @@ public abstract class AbstractKylinTestCase {
             "org.apache.kylin.storage.hybrid.HybridManager", //
             "org.apache.kylin.metadata.realization.RealizationRegistry", //
             "org.apache.kylin.metadata.project.ProjectManager", //
-            "org.apache.kylin.metadata.MetadataManager" //
+            "org.apache.kylin.metadata.MetadataManager", //
+            "org.apache.kylin.job.impl.threadpool.DistributedScheduler", //
+            "org.apache.kylin.job.manager.ExecutableManager", //
     };
 
     public abstract void createTestMetadata() throws Exception;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 2a4b2df..9292418 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -66,6 +66,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         this.config = config;
     }
     
+    protected KylinConfig getConfig() {
+        return config;
+    }
+    
     protected ExecutableManager getManager() {
         return ExecutableManager.getInstance(config);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 92fc8c9..1db612f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -65,6 +65,10 @@ public class ExecutableManager {
         return r;
     }
 
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
     private ExecutableManager(KylinConfig config) {
         logger.info("Using metadata url: " + config);
         this.config = config;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 17df119..3937a24 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -43,11 +43,11 @@ import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.lock.DistributedJobLock;
 import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,6 +98,10 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         return r;
     }
 
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
     private class FetcherRunner implements Runnable {
         @Override
         synchronized public void run() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index c33f3da..910db49 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -18,9 +18,13 @@
 
 package org.apache.kylin.job;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.StringUtils;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -29,12 +33,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
 import org.junit.AfterClass;
@@ -42,14 +46,11 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.util.Arrays;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 
-public class BaseTestDistributedScheduler {
-    static ExecutableManager jobService;
+public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
+    static ExecutableManager execMgr;
     static ZookeeperDistributedJobLock jobLock;
     static DistributedScheduler scheduler1;
     static DistributedScheduler scheduler2;
@@ -62,35 +63,38 @@ public class BaseTestDistributedScheduler {
     static final String segmentId2 = "segmentId2";
     static final String serverName1 = "serverName1";
     static final String serverName2 = "serverName2";
-    static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
     static final String confSrcPath = "../examples/test_case_data/sandbox/kylin.properties";
-    static final String confDstPath = "../examples/kylin.properties";
-    static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
+    static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties";
+    static final String confDstPath2 = "target/kylin_metadata_dist_lock_test2/kylin.properties";
 
     private static final Logger logger = LoggerFactory.getLogger(BaseTestDistributedScheduler.class);
 
-    static {
-        try {
-            ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
     @BeforeClass
     public static void setup() throws Exception {
-        staticCreateTestMetadata(SANDBOX_TEST_DATA);
+        staticCreateTestMetadata();
         System.setProperty("kylin.job.controller.lock", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
 
+        new File(confDstPath1).getParentFile().mkdirs();
+        new File(confDstPath2).getParentFile().mkdirs();
+        KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
+        String backup = srcConfig.getMetadataUrl();
+        srcConfig.setProperty("kylin.metadata.url", "kylin_metadata_dist_lock_test@hbase");
+        srcConfig.writeProperties(new File(confDstPath1));
+        srcConfig.writeProperties(new File(confDstPath2));
+        srcConfig.setProperty("kylin.metadata.url", backup);
+        kylinConfig1 = KylinConfig.createInstanceFromUri(new File(confDstPath1).getAbsolutePath());
+        kylinConfig2 = KylinConfig.createInstanceFromUri(new File(confDstPath2).getAbsolutePath());
+        
         initZk();
 
-        kylinConfig1 = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig1);
-        for (String jobId : jobService.getAllJobIds()) {
-            jobService.deleteJob(jobId);
+        if (jobLock == null)
+            jobLock = new ZookeeperDistributedJobLock(kylinConfig1);
+
+        execMgr = ExecutableManager.getInstance(kylinConfig1);
+        for (String jobId : execMgr.getAllJobIds()) {
+            execMgr.deleteJob(jobId);
         }
 
-        jobLock = new ZookeeperDistributedJobLock();
         scheduler1 = DistributedScheduler.getInstance(kylinConfig1);
         scheduler1.setServerName(serverName1);
         scheduler1.init(new JobEngineConfig(kylinConfig1), jobLock);
@@ -98,11 +102,6 @@ public class BaseTestDistributedScheduler {
             throw new RuntimeException("scheduler1 not started");
         }
 
-        String absoluteConfSrcPath = new File(confSrcPath).getAbsolutePath();
-        String absoluteConfDstPath = new File(confDstPath).getAbsolutePath();
-        copyFile(absoluteConfSrcPath, absoluteConfDstPath);
-        kylinConfig2 = KylinConfig.createInstanceFromUri(absoluteConfDstPath);
-
         scheduler2 = DistributedScheduler.getInstance(kylinConfig2);
         scheduler2.setServerName(serverName2);
         scheduler2.init(new JobEngineConfig(kylinConfig2), jobLock);
@@ -115,22 +114,30 @@ public class BaseTestDistributedScheduler {
 
     @AfterClass
     public static void after() throws Exception {
-        System.clearProperty(KylinConfig.KYLIN_CONF);
+        if (scheduler1 != null) {
+            scheduler1.shutdown();
+            scheduler1 = null;
+        }
+        if (scheduler2 != null) {
+            scheduler2.shutdown();
+            scheduler2 = null;
+        }
+        if (jobLock != null) {
+            jobLock.close();
+            jobLock = null;
+        }
+        if (zkClient != null) {
+            zkClient.close();
+            zkClient = null;
+        }
+        
         System.clearProperty("kylin.job.controller.lock");
-
-        deleteFile(confDstPath);
-    }
-
-    private static void staticCreateTestMetadata(String kylinConfigFolder) {
-        KylinConfig.destroyInstance();
-
-        if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null)
-            System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder);
+        staticCleanupTestMetadata();
     }
 
     void waitForJobFinish(String jobId) {
         while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
+            AbstractExecutable job = execMgr.getJob(jobId);
             final ExecutableState status = job.getStatus();
             if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
                 break;
@@ -146,7 +153,7 @@ public class BaseTestDistributedScheduler {
 
     void waitForJobStatus(String jobId, ExecutableState state, long interval) {
         while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
+            AbstractExecutable job = execMgr.getJob(jobId);
             if (state == job.getStatus()) {
                 break;
             } else {
@@ -177,7 +184,7 @@ public class BaseTestDistributedScheduler {
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
         final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
         final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
             @Nullable
             @Override
             public String apply(String input) {
@@ -203,24 +210,6 @@ public class BaseTestDistributedScheduler {
     }
 
     private String getLockPath(String pathName) {
-        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
-    }
-
-    private static void copyFile(String srcPath, String dstPath) {
-        try {
-            File srcFile = new File(srcPath);
-            File dstFile = new File(dstPath);
-            Files.copy(srcFile.toPath(), dstFile.toPath());
-        } catch (Exception e) {
-            logger.error("copy the file failed", e);
-        }
-    }
-
-    private static void deleteFile(String path) {
-        try {
-            Files.delete(new File(path).toPath());
-        } catch (Exception e) {
-            logger.error("delete the file failed", e);
-        }
+        return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
index 052baad..4696e67 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
@@ -18,12 +18,10 @@
 
 package org.apache.kylin.job;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.impl.threadpool.DefaultContext;
 
 public class ContextTestExecutable extends AbstractExecutable {
     public ContextTestExecutable() {
@@ -33,19 +31,14 @@ public class ContextTestExecutable extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
 
-        DefaultContext defaultContext = (DefaultContext) context;
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
         }
-        if (getHashCode(defaultContext.getConfig()) == getHashCode(KylinConfig.getInstanceFromEnv())) {
+        if (context.getConfig() == BaseTestDistributedScheduler.kylinConfig1) {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } else {
             return new ExecuteResult(ExecuteResult.State.ERROR, "error");
         }
     }
-
-    private int getHashCode(KylinConfig config) {
-        return System.identityHashCode(config);
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
index 443e73b..0d5e011 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
@@ -42,16 +42,16 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
         job.addTask(task1);
         job.addTask(task2);
         job.addTask(task3);
-        jobService.addJob(job);
+        execMgr.addJob(job);
 
         Assert.assertEquals(serverName1, getServerName(segmentId1));
 
         waitForJobFinish(job.getId());
 
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
 
         Assert.assertEquals(null, getServerName(segmentId1));
     }
@@ -66,11 +66,11 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
         ContextTestExecutable task1 = new ContextTestExecutable();
         task1.setParam(SEGMENT_ID, segmentId2);
         job.addTask(task1);
-        jobService.addJob(job);
+        execMgr.addJob(job);
 
         waitForJobFinish(job.getId());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
 
         if (!lock(jobLock, segmentId2, serverName2)) {
             throw new JobException("fail to get the lock");
@@ -81,10 +81,10 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
         ContextTestExecutable task2 = new ContextTestExecutable();
         task2.setParam(SEGMENT_ID, segmentId2);
         job2.addTask(task2);
-        jobService.addJob(job2);
+        execMgr.addJob(job2);
 
         waitForJobFinish(job2.getId());
-        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
-        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job2.getId()).getState());
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
index 3137aef..2b15ddd 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
@@ -43,7 +43,7 @@ public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedSched
         job.addTask(task1);
         job.addTask(task2);
         job.addTask(task3);
-        jobService.addJob(job);
+        execMgr.addJob(job);
 
         waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
 
@@ -52,9 +52,9 @@ public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedSched
 
         waitForJobFinish(job.getId());
 
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index d8d27c5..613d783 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ExecutorService;
 
 import javax.annotation.Nullable;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -52,11 +52,19 @@ import com.google.common.collect.Iterables;
 public class ZookeeperDistributedJobLock implements DistributedJobLock {
     private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
 
-    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-    private static CuratorFramework zkClient;
-    private static PathChildrenCache childrenCache;
+    public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+
+    final private KylinConfig config;
+    final CuratorFramework zkClient;
+    final PathChildrenCache childrenCache;
+
+    public ZookeeperDistributedJobLock() {
+        this(KylinConfig.getInstanceFromEnv());
+    }
+
+    public ZookeeperDistributedJobLock(KylinConfig config) {
+        this.config = config;
 
-    static {
         String zkConnectString = getZKConnectString();
         logger.info("zk connection string:" + zkConnectString);
         if (StringUtils.isEmpty(zkConnectString)) {
@@ -71,12 +79,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                try {
-                    childrenCache.close();
-                    zkClient.close();
-                } catch (Exception e) {
-                    logger.error("error occurred to close PathChildrenCache", e);
-                }
+                close();
             }
         }));
     }
@@ -200,7 +203,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
         final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
         final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
             @Nullable
             @Override
             public String apply(String input) {
@@ -210,11 +213,11 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
     }
 
     private String getLockPath(String pathName) {
-        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName;
     }
 
-    private static String getWatchPath() {
-        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+    private String getWatchPath() {
+        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix();
     }
 
     @Override
@@ -226,4 +229,13 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
     public void unlock() {
 
     }
+    
+    public void close() {
+        try {
+            childrenCache.close();
+            zkClient.close();
+        } catch (Exception e) {
+            logger.error("error occurred to close PathChildrenCache", e);
+        }
+    }
 }