You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/04/16 11:43:28 UTC

[7/7] kylin git commit: KYLIN-2506 code review, split the role of DistributedLock and JobLock

KYLIN-2506 code review, split the role of DistributedLock and JobLock


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/06bf2a16
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/06bf2a16
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/06bf2a16

Branch: refs/heads/master-KYLIN-2506
Commit: 06bf2a16234b323ddf66c65b4056a5412db46143
Parents: 796d80f
Author: Yang Li <li...@apache.org>
Authored: Sun Apr 16 19:42:54 2017 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Apr 16 19:42:54 2017 +0800

----------------------------------------------------------------------
 core-common/pom.xml                             |  5 ---
 .../apache/kylin/common/KylinConfigBase.java    |  7 ++++
 .../kylin/common/lock/DistributedJobLock.java   | 38 --------------------
 .../kylin/common/lock/DistributedLock.java      | 37 +++++++++++++++++++
 .../org/apache/kylin/common/lock/JobLock.java   | 26 --------------
 .../apache/kylin/common/lock/MockJobLock.java   | 33 -----------------
 .../kylin/dict/GlobalDictionaryBuilder.java     | 28 +++++++--------
 .../java/org/apache/kylin/job/Scheduler.java    |  2 +-
 .../job/impl/threadpool/DefaultScheduler.java   | 10 +++---
 .../impl/threadpool/DistributedScheduler.java   | 27 +++++++-------
 .../kylin/job/lock/DistributedJobLock.java      | 24 +++++++++++++
 .../java/org/apache/kylin/job/lock/JobLock.java | 30 ++++++++++++++++
 .../org/apache/kylin/job/lock/MockJobLock.java  | 33 +++++++++++++++++
 .../job/impl/threadpool/BaseSchedulerTest.java  |  2 +-
 .../test_case_data/localmeta/kylin.properties   |  2 +-
 .../test_case_data/sandbox/kylin.properties     |  2 +-
 .../kylin/job/BaseTestDistributedScheduler.java |  2 +-
 .../apache/kylin/rest/service/JobService.java   |  2 +-
 .../hbase/util/ZookeeperDistributedJobLock.java | 17 +++++----
 .../storage/hbase/util/ZookeeperJobLock.java    |  6 ++--
 20 files changed, 183 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 5b5f78b..95d3c29 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -69,11 +69,6 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4361242..bf6cdb8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -29,6 +29,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.lock.DistributedLock;
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -235,6 +237,11 @@ abstract public class KylinConfigBase implements Serializable {
     public Map<String, String> getCubeCustomMeasureTypes() {
         return getPropertiesByPrefix("kylin.metadata.custom-measure-types.");
     }
+    
+    public DistributedLock getDistributedLock() {
+        String clsName = getOptional("kylin.metadata.distributed-lock-impl", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+        return (DistributedLock) ClassUtil.newInstance(clsName);
+    }
 
     // ============================================================================
     // DICTIONARY & SNAPSHOT

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
deleted file mode 100644
index 00d1ca4..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-
-import java.util.concurrent.Executor;
-
-public interface DistributedJobLock extends JobLock {
-
-    boolean lockWithClient(String lockPath, String lockClient);
-
-    boolean isHasLocked(String lockPath);
-
-    void unlock(String lockPath);
-
-    PathChildrenCache watch(String watchPath, Executor watchExecutor, WatcherProcess process);
-
-    public interface WatcherProcess {
-        void process(String path, String data);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
new file mode 100644
index 0000000..ead7714
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.lock;
+
+import java.io.Closeable;
+import java.util.concurrent.Executor;
+
+public interface DistributedLock extends Closeable {
+
+    boolean lockPath(String lockPath, String lockClient);
+
+    boolean isPathLocked(String lockPath);
+
+    void unlockPath(String lockPath);
+
+    Closeable watchPath(String watchPath, Executor watchExecutor, Watcher process);
+
+    public interface Watcher {
+        void process(String path, String data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
deleted file mode 100644
index 5802d71..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-
-public interface JobLock {
-    boolean lock();
-
-    void unlock();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
deleted file mode 100644
index f8233be..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.lock;
-
-/**
- */
-public class MockJobLock implements JobLock {
-    @Override
-    public boolean lock() {
-        return true;
-    }
-
-    @Override
-    public void unlock() {
-        return;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 9d66b12..f2ed375 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -18,19 +18,19 @@
 
 package org.apache.kylin.dict;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.DistributedJobLock;
-import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.util.Dictionary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 /**
  * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
  * GlobalDictinary mainly used for count distinct measure to support rollup among segments.
@@ -40,7 +40,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
     private AppendTrieDictionaryBuilder builder;
     private int baseId;
 
-    private DistributedJobLock lock;
+    private DistributedLock lock;
     private String sourceColumn;
     //the job thread name is UUID+threadID
     private final String jobUUID = Thread.currentThread().getName();
@@ -65,7 +65,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
     @Override
     public boolean addValue(String value) {
         if (++counter % 1_000_000 == 0) {
-            if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+            if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
                 logger.info("processed {} values", counter);
             } else {
                 throw new RuntimeException("Failed to create global dictionary on " + sourceColumn + " This client doesn't keep the lock");
@@ -89,7 +89,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
     @Override
     public Dictionary<String> build() throws IOException {
         try {
-            if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+            if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
                 return builder.build(baseId);
             }
         } finally {
@@ -99,17 +99,17 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
     }
 
     private void lock(final String sourceColumn) throws IOException {
-        lock = (DistributedJobLock) ClassUtil.newInstance("org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+        lock = KylinConfig.getInstanceFromEnv().getDistributedLock();
 
-        if (!lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+        if (!lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
             logger.info("{} will wait the lock for {} ", jobUUID, sourceColumn);
 
             final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1);
 
-            PathChildrenCache cache = lock.watch(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedJobLock.WatcherProcess() {
+            Closeable watch = lock.watchPath(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedLock.Watcher() {
                 @Override
                 public void process(String path, String data) {
-                    if (!data.equalsIgnoreCase(jobUUID) && lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
+                    if (!data.equalsIgnoreCase(jobUUID) && lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
                         try {
                             bq.put("getLock");
                         } catch (InterruptedException e) {
@@ -126,7 +126,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             } finally {
-                cache.close();
+                watch.close();
             }
 
             logger.info("{} has waited the lock {} ms for {} ", jobUUID, (System.currentTimeMillis() - start), sourceColumn);
@@ -134,8 +134,8 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
     }
 
     private void checkAndUnlock() {
-        if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) {
-            lock.unlock(getLockPath(sourceColumn));
+        if (lock.lockPath(getLockPath(sourceColumn), jobUUID)) {
+            lock.unlockPath(getLockPath(sourceColumn));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
index e2cfd44..93d2510 100644
--- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java
@@ -21,7 +21,7 @@ package org.apache.kylin.job;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 688708e..8b6b5aa 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -40,7 +40,7 @@ import org.apache.kylin.job.execution.Executable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -187,8 +187,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
     }
 
     @Override
-    public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
-        this.jobLock = jobLock;
+    public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException {
+        jobLock = lock;
         
         String serverMode = jobEngineConfig.getConfig().getServerMode();
         if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
@@ -205,7 +205,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
 
         this.jobEngineConfig = jobEngineConfig;
 
-        if (jobLock.lock() == false) {
+        if (jobLock.lockJobEngine() == false) {
             throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
         }
 
@@ -226,7 +226,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
     @Override
     public void shutdown() throws SchedulerException {
         logger.info("Shutingdown Job Engine ....");
-        jobLock.unlock();
+        jobLock.unlockJobEngine();
         fetcherPool.shutdown();
         jobPool.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index c5b03dc..d544320 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -34,10 +35,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -50,8 +51,8 @@ import org.apache.kylin.job.execution.Executable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.DistributedJobLock;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.JobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,8 +73,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     private ExecutorService watchPool;
     private ExecutorService jobPool;
     private DefaultContext context;
-    private DistributedJobLock jobLock;
-    private PathChildrenCache lockWatch;
+    private DistributedLock jobLock;
+    private Closeable lockWatch;
 
     private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
     private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
@@ -181,7 +182,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         public void run() {
             try (SetThreadName ignored = new SetThreadName("Job %s", executable.getId())) {
                 String segmentId = executable.getParam(SEGMENT_ID);
-                if (jobLock.lockWithClient(getLockPath(segmentId), serverName)) {
+                if (jobLock.lockPath(getLockPath(segmentId), serverName)) {
                     logger.info(executable.toString() + " scheduled in server: " + serverName);
 
                     context.addRunningJob(executable);
@@ -209,7 +210,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
                 if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
                     if (segmentWithLocks.contains(segmentId)) {
                         logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
-                        jobLock.unlock(getLockPath(segmentId));
+                        jobLock.unlockPath(getLockPath(segmentId));
                         segmentWithLocks.remove(segmentId);
                     }
                 }
@@ -218,7 +219,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     }
 
     //when the segment lock released but the segment related job still running, resume the job.
-    private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedJobLock.WatcherProcess {
+    private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedLock.Watcher {
         private String serverName;
 
         public WatcherProcessImpl(String serverName) {
@@ -237,7 +238,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
                     if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
                         try {
                             logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
-                            if (!jobLock.isHasLocked(getLockPath(segmentId))) {
+                            if (!jobLock.isPathLocked(getLockPath(segmentId))) {
                                 executableManager.resumeRunningJobForce(executable.getId());
                                 fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
                                 break;
@@ -264,7 +265,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     }
 
     @Override
-    public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
+    public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
         String serverMode = jobEngineConfig.getConfig().getServerMode();
         if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
             logger.info("server mode: " + serverMode + ", no need to run job scheduler");
@@ -288,7 +289,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         //watch the zookeeper node change, so that when one job server is down, other job servers can take over.
         watchPool = Executors.newFixedThreadPool(1);
         WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName);
-        lockWatch = this.jobLock.watch(getWatchPath(), watchPool, watcherProcess);
+        lockWatch = this.jobLock.watchPath(getWatchPath(), watchPool, watcherProcess);
 
         int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
         jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
@@ -307,7 +308,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
             AbstractExecutable executable = executableManager.getJob(id);
             if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
                 try {
-                    if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) {
+                    if (!jobLock.isPathLocked(executable.getParam(SEGMENT_ID))) {
                         executableManager.resumeRunningJobForce(executable.getId());
                         fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
                     }
@@ -348,7 +349,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
 
     private void releaseAllLocks() {
         for (String segmentId : segmentWithLocks) {
-            jobLock.unlock(getLockPath(segmentId));
+            jobLock.unlockPath(getLockPath(segmentId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
new file mode 100644
index 0000000..e5e2a1e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+import org.apache.kylin.common.lock.DistributedLock;
+
+public interface DistributedJobLock extends JobLock, DistributedLock {
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
new file mode 100644
index 0000000..1b6b29e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+/**
+ * Among a Kylin cluster, usually only one node runs as the job engine and does the scheduling of build jobs.
+ * This interface is for such negotiation. 
+ */
+public interface JobLock {
+    
+    boolean lockJobEngine();
+
+    void unlockJobEngine();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
new file mode 100644
index 0000000..73f6192
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+    @Override
+    public boolean lockJobEngine() {
+        return true;
+    }
+
+    @Override
+    public void unlockJobEngine() {
+        return;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 1bafa34..1ada9a1 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -28,7 +28,7 @@ import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.common.lock.MockJobLock;
+import org.apache.kylin.job.lock.MockJobLock;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 3866575..969e466 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -133,7 +133,7 @@ kylin.security.saml.context-path=/kylin
 kylin.test.bcc.new.key=some-value
 kylin.engine.mr.config-override.test1=test1
 kylin.engine.mr.config-override.test2=test2
-kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
 
 kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine
 kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index c0a4968..684b4dd 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -112,7 +112,7 @@ kylin.query.udf.concat=org.apache.kylin.query.udf.ConcatUDF
 kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF
 
 # for test
-kylin.job.lock=org.apache.kylin.common.lock.MockJobLock
+kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
 kylin.engine.mr.uhc-reducer-count=3
 
 ### CUBE ###

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index 4877ca1..b4ac42f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
     }
 
     boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) {
-        return jobLock.lockWithClient(getLockPath(cubeName), serverName);
+        return jobLock.lockPath(getLockPath(cubeName), serverName);
     }
 
     private static void initZk() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 31d1ded..4ba426e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -51,7 +51,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.constant.Constant;

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 5f5a721..eb01e4b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -32,7 +32,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.DistributedJobLock;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +40,9 @@ import org.slf4j.LoggerFactory;
 public class ZookeeperDistributedJobLock implements DistributedJobLock {
     private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
 
+    @SuppressWarnings("unused")
     private final KylinConfig config;
+    
     private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>();
     private final CuratorFramework zkClient;
 
@@ -100,7 +102,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
      */
 
     @Override
-    public boolean lockWithClient(String lockPath, String lockClient) {
+    public boolean lockPath(String lockPath, String lockClient) {
         logger.info(lockClient + " start lock the path: " + lockPath);
 
         boolean hasLock = false;
@@ -163,7 +165,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
      */
 
     @Override
-    public boolean isHasLocked(String lockPath) {
+    public boolean isPathLocked(String lockPath) {
         try {
             return zkClient.checkExists().forPath(lockPath) != null;
         } catch (Exception e) {
@@ -183,7 +185,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
      */
 
     @Override
-    public void unlock(String lockPath) {
+    public void unlockPath(String lockPath) {
         try {
             if (zkClient.checkExists().forPath(lockPath) != null) {
                 zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
@@ -213,7 +215,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
      */
 
     @Override
-    public PathChildrenCache watch(String watchPath, Executor watchExecutor, final WatcherProcess watcherProcess) {
+    public PathChildrenCache watchPath(String watchPath, Executor watchExecutor, final Watcher watcherProcess) {
         PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, true);
         try {
             cache.start();
@@ -236,14 +238,15 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
     }
 
     @Override
-    public boolean lock() {
+    public boolean lockJobEngine() {
         return true;
     }
 
     @Override
-    public void unlock() {
+    public void unlockJobEngine() {
     }
 
+    @Override
     public void close() {
         try {
             zkClient.close();

http://git-wip-us.apache.org/repos/asf/kylin/blob/06bf2a16/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 7315d1d..6a3cf7e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -35,7 +35,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.lock.JobLock;
+import org.apache.kylin.job.lock.JobLock;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -56,7 +56,7 @@ public class ZookeeperJobLock implements JobLock {
     private CuratorFramework zkClient;
 
     @Override
-    public boolean lock() {
+    public boolean lockJobEngine() {
         this.scheduleID = schedulerId();
         String zkConnectString = getZKConnectString();
         logger.info("zk connection string:" + zkConnectString);
@@ -100,7 +100,7 @@ public class ZookeeperJobLock implements JobLock {
     }
 
     @Override
-    public void unlock() {
+    public void unlockJobEngine() {
         releaseLock();
     }