You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 22:34:44 UTC

[3/6] kylin git commit: KYLIN-2006 Make job engine distributed and HA

KYLIN-2006 Make job engine distributed and HA

Signed-off-by: Yang Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4f66783e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4f66783e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4f66783e

Branch: refs/heads/master
Commit: 4f66783e1e4a765080e26e19cc0fe53a78ca599a
Parents: 5858448
Author: kangkaisen <ka...@live.com>
Authored: Mon Sep 5 20:15:23 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 9 06:32:01 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   1 +
 .../kylin/job/execution/ExecutableManager.java  |  23 ++
 .../impl/threadpool/DistributedScheduler.java   | 349 +++++++++++++++++++
 .../kylin/job/lock/DistributedJobLock.java      |  29 ++
 .../org/apache/kylin/job/lock/DoWatchLock.java  |  23 ++
 .../kylin/job/BaseTestDistributedScheduler.java | 226 ++++++++++++
 .../apache/kylin/job/ContextTestExecutable.java |  51 +++
 .../job/ITDistributedSchedulerBaseTest.java     |  90 +++++
 .../job/ITDistributedSchedulerTakeOverTest.java |  60 ++++
 .../kylin/rest/controller/JobController.java    |  62 +---
 .../apache/kylin/rest/service/CubeService.java  |   4 +
 .../apache/kylin/rest/service/JobService.java   |  96 ++++-
 .../hbase/util/ZookeeperDistributedJobLock.java | 230 ++++++++++++
 13 files changed, 1182 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6d3e807..ee9f57c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -755,6 +755,7 @@ abstract public class KylinConfigBase implements Serializable {
     public Map<Integer, String> getSchedulers() {
         Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.scheduler."));
         r.put(0, "org.apache.kylin.job.impl.threadpool.DefaultScheduler");
+        r.put(2, "org.apache.kylin.job.impl.threadpool.DistributedScheduler");
         return r;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 0901443..92fc8c9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -235,11 +235,30 @@ public class ExecutableManager {
         }
     }
 
+    public void resumeRunningJobForce(String jobId) {
+        AbstractExecutable job = getJob(jobId);
+        if (job == null) {
+            return;
+        }
+
+        if (job instanceof DefaultChainedExecutable) {
+            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+            for (AbstractExecutable task : tasks) {
+                if (task.getStatus() == ExecutableState.RUNNING) {
+                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+                    break;
+                }
+            }
+        }
+        updateJobOutput(jobId, ExecutableState.READY, null, null);
+    }
+
     public void resumeJob(String jobId) {
         AbstractExecutable job = getJob(jobId);
         if (job == null) {
             return;
         }
+
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
@@ -254,6 +273,10 @@ public class ExecutableManager {
 
     public void discardJob(String jobId) {
         AbstractExecutable job = getJob(jobId);
+        if (job == null) {
+            return;
+        }
+
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
new file mode 100644
index 0000000..11709c7
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.Scheduler;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.DoWatchLock;
+import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ * schedule the cubing jobs when several job server running with the same metadata.
+ *
+ * to enable the distributed job server, you need to set and update three configs in the kylin.properties:
+ *  1. kylin.enable.scheduler=2
+ *  2. kylin.job.controller.lock=org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock
+ *  3. add all the job servers and query servers to the kylin.rest.servers
+ */
+public class DistributedScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
+    private ExecutableManager executableManager;
+    private FetcherRunner fetcher;
+    private ScheduledExecutorService fetcherPool;
+    private ExecutorService watchPool;
+    private ExecutorService jobPool;
+    private DefaultContext context;
+    private DistributedJobLock jobLock;
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
+    private static final ConcurrentHashMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
+    //keep all segments having running job
+    private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>();
+    private volatile boolean initialized = false;
+    private volatile boolean hasStarted = false;
+    private JobEngineConfig jobEngineConfig;
+
+    private final static String SEGMENT_ID = "segmentId";
+
+    //only for it test
+    public static DistributedScheduler getInstance(KylinConfig config) {
+        DistributedScheduler r = CACHE.get(config);
+        if (r == null) {
+            synchronized (DistributedScheduler.class) {
+                r = CACHE.get(config);
+                if (r == null) {
+                    r = new DistributedScheduler();
+                    CACHE.put(config, r);
+                    if (CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    private class FetcherRunner implements Runnable {
+        @Override
+        synchronized public void run() {
+            try {
+                Map<String, Executable> runningJobs = context.getRunningJobs();
+                if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
+                    logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
+                    return;
+                }
+
+                int nRunning = 0, nOtherRunning = 0, nReady = 0, nOthers = 0;
+                for (final String id : executableManager.getAllJobIds()) {
+                    if (runningJobs.containsKey(id)) {
+                        nRunning++;
+                        continue;
+                    }
+
+                    final Output output = executableManager.getOutput(id);
+
+                    if ((output.getState() != ExecutableState.READY)) {
+                        if (output.getState() == ExecutableState.RUNNING) {
+                            nOtherRunning++;
+                        } else {
+                            nOthers++;
+                        }
+                        continue;
+                    }
+
+                    nReady++;
+                    final AbstractExecutable executable = executableManager.getJob(id);
+                    try {
+                        jobPool.execute(new JobRunner(executable));
+                    } catch (Exception ex) {
+                        logger.warn(executable.toString() + " fail to schedule in server: " + serverName, ex);
+                    }
+                }
+                logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others");
+            } catch (Exception e) {
+                logger.warn("Job Fetcher caught a exception " + e);
+            }
+        }
+    }
+
+    private String serverName = getServerName();
+
+    private String getServerName() {
+        String serverName = null;
+        try {
+            serverName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            logger.error("fail to get the serverName");
+        }
+        return serverName;
+    }
+
+    //only for it test
+    public void setServerName(String serverName) {
+        this.serverName = serverName;
+        logger.info("serverName update to:" + this.serverName);
+    }
+
+    private class JobRunner implements Runnable {
+
+        private final AbstractExecutable executable;
+
+        public JobRunner(AbstractExecutable executable) {
+            this.executable = executable;
+        }
+
+        @Override
+        public void run() {
+            try {
+                String segmentId = executable.getParam(SEGMENT_ID);
+                if (jobLock.lockWithName(segmentId, serverName)) {
+                    logger.info(executable.toString() + " scheduled in server: " + serverName);
+
+                    context.addRunningJob(executable);
+                    segmentWithLocks.add(segmentId);
+                    executable.execute(context);
+                }
+            } catch (ExecuteException e) {
+                logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e);
+            } catch (Exception e) {
+                logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e);
+            } finally {
+                context.removeRunningJob(executable);
+                releaseJobLock(executable);
+                // trigger the next step asap
+                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+            }
+        }
+
+        //release job lock only when the all tasks of the job finish and the job server keep the cube lock.
+        private void releaseJobLock(AbstractExecutable executable) {
+            if (executable instanceof DefaultChainedExecutable) {
+                String segmentId = executable.getParam(SEGMENT_ID);
+                ExecutableState state = executable.getStatus();
+
+                if (state == ExecutableState.SUCCEED || state == ExecutableState.ERROR || state == ExecutableState.DISCARDED) {
+                    if (segmentWithLocks.contains(segmentId)) {
+                        logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
+                        jobLock.unlockWithName(segmentId);
+                        segmentWithLocks.remove(segmentId);
+                    }
+                }
+            }
+        }
+    }
+
+    //when the segment lock released but the segment related job still running, resume the job.
+    private class DoWatchImpl implements DoWatchLock {
+        private String serverName;
+
+        public DoWatchImpl(String serverName) {
+            this.serverName = serverName;
+        }
+
+        @Override
+        public void doWatch(String path, String nodeData) {
+            String[] paths = path.split("/");
+            String segmentId = paths[paths.length - 1];
+
+            for (final String id : executableManager.getAllJobIds()) {
+                final Output output = executableManager.getOutput(id);
+                if (output.getState() == ExecutableState.RUNNING) {
+                    AbstractExecutable executable = executableManager.getJob(id);
+                    if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
+                        try {
+                            logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
+                            if (jobLock.lockWithName(segmentId, serverName)) {
+                                executableManager.resumeRunningJobForce(executable.getId());
+                                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+                                break;
+                            }
+                        } catch (Exception e) {
+                            logger.error("resume the job but fail in server: " + serverName, e);
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+        if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
+            try {
+                shutdown();
+            } catch (SchedulerException e) {
+                throw new RuntimeException("failed to shutdown scheduler", e);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
+        String serverMode = jobEngineConfig.getConfig().getServerMode();
+        if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
+            logger.info("server mode: " + serverMode + ", no need to run job scheduler");
+            return;
+        }
+        logger.info("Initializing Job Engine ....");
+
+        if (!initialized) {
+            initialized = true;
+        } else {
+            return;
+        }
+
+        this.jobEngineConfig = jobEngineConfig;
+        this.jobLock = (DistributedJobLock) jobLock;
+
+        executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
+        //load all executable, set them to a consistent status
+        fetcherPool = Executors.newScheduledThreadPool(1);
+
+        //watch the zookeeper node change, so that when one job server is down, other job servers can take over.
+        watchPool = Executors.newFixedThreadPool(1);
+        DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName);
+        this.jobLock.watchLock(watchPool, doWatchImpl);
+
+        int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
+        jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
+        context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
+
+        resumeAllRunningJobs();
+
+        fetcher = new FetcherRunner();
+        fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
+        hasStarted = true;
+    }
+
+    private void resumeAllRunningJobs() {
+        for (final String id : executableManager.getAllJobIds()) {
+            final Output output = executableManager.getOutput(id);
+            AbstractExecutable executable = executableManager.getJob(id);
+            if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
+                try {
+                    if (jobLock.lockWithName(executable.getParam(SEGMENT_ID), serverName)) {
+                        executableManager.resumeRunningJobForce(executable.getId());
+                        fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+                    }
+                } catch (Exception e) {
+                    logger.error("resume the job " + id + " fail in server: " + serverName, e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() throws SchedulerException {
+        logger.info("Will shut down Job Engine ....");
+
+        releaseAllLocks();
+        logger.info("The all locks has released");
+
+        watchPool.shutdown();
+        logger.info("The watchPool has down");
+
+        fetcherPool.shutdown();
+        logger.info("The fetcherPool has down");
+
+        jobPool.shutdown();
+        logger.info("The jobPoll has down");
+    }
+
+    private void releaseAllLocks() {
+        for (String segmentId : segmentWithLocks) {
+            jobLock.unlockWithName(segmentId);
+        }
+    }
+
+    @Override
+    public boolean stop(AbstractExecutable executable) throws SchedulerException {
+        if (hasStarted) {
+            return true;
+        } else {
+            //TODO should try to stop this executable
+            return true;
+        }
+    }
+
+    @Override
+    public boolean hasStarted() {
+        return this.hasStarted;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
new file mode 100644
index 0000000..5ba8426
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+import java.util.concurrent.ExecutorService;
+
+public interface DistributedJobLock extends JobLock {
+    boolean lockWithName(String cubeName, String serverName);
+
+    void unlockWithName(String name);
+
+    void watchLock(ExecutorService pool, DoWatchLock doWatch);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
new file mode 100644
index 0000000..08c13f9
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.lock;
+
+public interface DoWatchLock {
+    void doWatch(String path, String data);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
new file mode 100644
index 0000000..c33f3da
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+
+public class BaseTestDistributedScheduler {
+    static ExecutableManager jobService;
+    static ZookeeperDistributedJobLock jobLock;
+    static DistributedScheduler scheduler1;
+    static DistributedScheduler scheduler2;
+    static KylinConfig kylinConfig1;
+    static KylinConfig kylinConfig2;
+    static CuratorFramework zkClient;
+
+    static final String SEGMENT_ID = "segmentId";
+    static final String segmentId1 = "segmentId1";
+    static final String segmentId2 = "segmentId2";
+    static final String serverName1 = "serverName1";
+    static final String serverName2 = "serverName2";
+    static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+    static final String confSrcPath = "../examples/test_case_data/sandbox/kylin.properties";
+    static final String confDstPath = "../examples/kylin.properties";
+    static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
+
+    private static final Logger logger = LoggerFactory.getLogger(BaseTestDistributedScheduler.class);
+
+    static {
+        try {
+            ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        staticCreateTestMetadata(SANDBOX_TEST_DATA);
+        System.setProperty("kylin.job.controller.lock", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+
+        initZk();
+
+        kylinConfig1 = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig1);
+        for (String jobId : jobService.getAllJobIds()) {
+            jobService.deleteJob(jobId);
+        }
+
+        jobLock = new ZookeeperDistributedJobLock();
+        scheduler1 = DistributedScheduler.getInstance(kylinConfig1);
+        scheduler1.setServerName(serverName1);
+        scheduler1.init(new JobEngineConfig(kylinConfig1), jobLock);
+        if (!scheduler1.hasStarted()) {
+            throw new RuntimeException("scheduler1 not started");
+        }
+
+        String absoluteConfSrcPath = new File(confSrcPath).getAbsolutePath();
+        String absoluteConfDstPath = new File(confDstPath).getAbsolutePath();
+        copyFile(absoluteConfSrcPath, absoluteConfDstPath);
+        kylinConfig2 = KylinConfig.createInstanceFromUri(absoluteConfDstPath);
+
+        scheduler2 = DistributedScheduler.getInstance(kylinConfig2);
+        scheduler2.setServerName(serverName2);
+        scheduler2.init(new JobEngineConfig(kylinConfig2), jobLock);
+        if (!scheduler2.hasStarted()) {
+            throw new RuntimeException("scheduler2 not started");
+        }
+
+        Thread.sleep(10000);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        System.clearProperty(KylinConfig.KYLIN_CONF);
+        System.clearProperty("kylin.job.controller.lock");
+
+        deleteFile(confDstPath);
+    }
+
+    private static void staticCreateTestMetadata(String kylinConfigFolder) {
+        KylinConfig.destroyInstance();
+
+        if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null)
+            System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder);
+    }
+
+    void waitForJobFinish(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            final ExecutableState status = job.getStatus();
+            if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    void waitForJobStatus(String jobId, ExecutableState state, long interval) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (state == job.getStatus()) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(interval);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) {
+        return jobLock.lockWithName(cubeName, serverName);
+    }
+
+    private static void initZk() {
+        String zkConnectString = getZKConnectString();
+        if (StringUtils.isEmpty(zkConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+        zkClient.start();
+    }
+
+    private static String getZKConnectString() {
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+
+    String getServerName(String cubeName) {
+        String lockPath = getLockPath(cubeName);
+        String serverName = null;
+        if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+            try {
+                if (zkClient.checkExists().forPath(lockPath) != null) {
+                    byte[] data = zkClient.getData().forPath(lockPath);
+                    serverName = new String(data, Charset.forName("UTF-8"));
+                }
+            } catch (Exception e) {
+                logger.error("get the serverName failed", e);
+            }
+        }
+        return serverName;
+    }
+
+    private String getLockPath(String pathName) {
+        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+    }
+
+    private static void copyFile(String srcPath, String dstPath) {
+        try {
+            File srcFile = new File(srcPath);
+            File dstFile = new File(dstPath);
+            Files.copy(srcFile.toPath(), dstFile.toPath());
+        } catch (Exception e) {
+            logger.error("copy the file failed", e);
+        }
+    }
+
+    private static void deleteFile(String path) {
+        try {
+            Files.delete(new File(path).toPath());
+        } catch (Exception e) {
+            logger.error("delete the file failed", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
new file mode 100644
index 0000000..052baad
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.DefaultContext;
+
+public class ContextTestExecutable extends AbstractExecutable {
+    public ContextTestExecutable() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+        DefaultContext defaultContext = (DefaultContext) context;
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+        if (getHashCode(defaultContext.getConfig()) == getHashCode(KylinConfig.getInstanceFromEnv())) {
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } else {
+            return new ExecuteResult(ExecuteResult.State.ERROR, "error");
+        }
+    }
+
+    private int getHashCode(KylinConfig config) {
+        return System.identityHashCode(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
new file mode 100644
index 0000000..443e73b
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler {
+    @Test
+    public void testSchedulerLock() throws Exception {
+        if (!lock(jobLock, segmentId1, serverName1)) {
+            throw new JobException("fail to get the lock");
+        }
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        job.setParam(SEGMENT_ID, segmentId1);
+        AbstractExecutable task1 = new SucceedTestExecutable();
+        task1.setParam(SEGMENT_ID, segmentId1);
+        AbstractExecutable task2 = new SucceedTestExecutable();
+        task2.setParam(SEGMENT_ID, segmentId1);
+        AbstractExecutable task3 = new SucceedTestExecutable();
+        task3.setParam(SEGMENT_ID, segmentId1);
+        job.addTask(task1);
+        job.addTask(task2);
+        job.addTask(task3);
+        jobService.addJob(job);
+
+        Assert.assertEquals(serverName1, getServerName(segmentId1));
+
+        waitForJobFinish(job.getId());
+
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+
+        Assert.assertEquals(null, getServerName(segmentId1));
+    }
+
+    @Test
+    public void testSchedulerConsistent() throws Exception {
+        if (!lock(jobLock, segmentId2, serverName1)) {
+            throw new JobException("fail to get the lock");
+        }
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        job.setParam(SEGMENT_ID, segmentId2);
+        ContextTestExecutable task1 = new ContextTestExecutable();
+        task1.setParam(SEGMENT_ID, segmentId2);
+        job.addTask(task1);
+        jobService.addJob(job);
+
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+
+        if (!lock(jobLock, segmentId2, serverName2)) {
+            throw new JobException("fail to get the lock");
+        }
+
+        DefaultChainedExecutable job2 = new DefaultChainedExecutable();
+        job2.setParam(SEGMENT_ID, segmentId2);
+        ContextTestExecutable task2 = new ContextTestExecutable();
+        task2.setParam(SEGMENT_ID, segmentId2);
+        job2.addTask(task2);
+        jobService.addJob(job2);
+
+        waitForJobFinish(job2.getId());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job2.getId()).getState());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
new file mode 100644
index 0000000..3137aef
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedScheduler {
+    @Test
+    public void testSchedulerTakeOver() throws Exception {
+        if (!lock(jobLock, segmentId2, serverName1)) {
+            throw new JobException("fail to get the lock");
+        }
+
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        job.setParam(SEGMENT_ID, segmentId2);
+        AbstractExecutable task1 = new SucceedTestExecutable();
+        task1.setParam(SEGMENT_ID, segmentId2);
+        AbstractExecutable task2 = new SucceedTestExecutable();
+        task2.setParam(SEGMENT_ID, segmentId2);
+        AbstractExecutable task3 = new SucceedTestExecutable();
+        task3.setParam(SEGMENT_ID, segmentId2);
+        job.addTask(task1);
+        job.addTask(task2);
+        job.addTask(task3);
+        jobService.addJob(job);
+
+        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+
+        scheduler1.shutdown();
+        scheduler1 = null;
+
+        waitForJobFinish(job.getId());
+
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 7022bfc..16b643c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -24,25 +24,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TimeZone;
 
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.Scheduler;
-import org.apache.kylin.job.SchedulerFactory;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.lock.JobLock;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.JobListRequest;
 import org.apache.kylin.rest.service.JobService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -50,64 +40,14 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
 
-/**
- *
- */
 @Controller
 @RequestMapping(value = "jobs")
-public class JobController extends BasicController implements InitializingBean {
+public class JobController extends BasicController {
     private static final Logger logger = LoggerFactory.getLogger(JobController.class);
 
     @Autowired
     private JobService jobService;
 
-    private JobLock jobLock;
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public void afterPropertiesSet() throws Exception {
-
-        String timeZone = jobService.getConfig().getTimeZone();
-        TimeZone tzone = TimeZone.getTimeZone(timeZone);
-        TimeZone.setDefault(tzone);
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory.scheduler(kylinConfig.getSchedulerType());
-
-        jobLock = (JobLock) ClassUtil.newInstance(kylinConfig.getJobControllerLock());
-
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    scheduler.init(new JobEngineConfig(kylinConfig), jobLock);
-                    if (!scheduler.hasStarted()) {
-                        logger.info("Job engine doesn't start in this node.");
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }).start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    scheduler.shutdown();
-                } catch (SchedulerException e) {
-                    logger.error("error occurred to shutdown scheduler", e);
-                }
-            }
-        }));
-    }
-
     /**
      * get all cube jobs
      * 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 99e54b9..a6246f8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -461,6 +461,8 @@ public class CubeService extends BasicService {
         }
 
         DefaultChainedExecutable job = new DefaultChainedExecutable();
+        //make sure the job could be scheduled when the DistributedScheduler is enable.
+        job.setParam("segmentId", tableName);
         job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
         job.setSubmitter(submitter);
 
@@ -471,6 +473,7 @@ public class CubeService extends BasicService {
 
         step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
         step1.setMapReduceParams(param);
+        step1.setParam("segmentId", tableName);
 
         job.addTask(step1);
 
@@ -478,6 +481,7 @@ public class CubeService extends BasicService {
 
         step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
         step2.setJobParams(param);
+        step2.setParam("segmentId", tableName);
         job.addTask(step2);
 
         getExecutableManager().addJob(job);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 49b9b9f..a6a9842 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -19,6 +19,8 @@
 package org.apache.kylin.rest.service;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
@@ -26,8 +28,11 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
@@ -38,15 +43,21 @@ import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.Scheduler;
+import org.apache.kylin.job.SchedulerFactory;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.JobLock;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.constant.Constant;
@@ -56,7 +67,9 @@ import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Component;
 
@@ -69,15 +82,64 @@ import com.google.common.collect.Sets;
 /**
  * @author ysong1
  */
+
+@EnableAspectJAutoProxy(proxyTargetClass = true)
 @Component("jobService")
-public class JobService extends BasicService {
+public class JobService extends BasicService implements InitializingBean {
 
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(JobService.class);
 
+    private JobLock jobLock;
+
     @Autowired
     private AccessService accessService;
 
+    /*
+    * (non-Javadoc)
+    *
+    * @see
+    * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
+    */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void afterPropertiesSet() throws Exception {
+
+        String timeZone = getConfig().getTimeZone();
+        TimeZone tzone = TimeZone.getTimeZone(timeZone);
+        TimeZone.setDefault(tzone);
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory.scheduler(kylinConfig.getSchedulerType());
+
+        jobLock = (JobLock) ClassUtil.newInstance(kylinConfig.getJobControllerLock());
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    scheduler.init(new JobEngineConfig(kylinConfig), jobLock);
+                    if (!scheduler.hasStarted()) {
+                        logger.info("scheduler has not been started");
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }).start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    scheduler.shutdown();
+                } catch (SchedulerException e) {
+                    logger.error("error occurred to shutdown scheduler", e);
+                }
+            }
+        }));
+    }
+
     public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, final JobTimeFilterEnum timeFilter) throws IOException, JobException {
         Integer limit = (null == limitValue) ? 30 : limitValue;
         Integer offset = (null == offsetValue) ? 0 : offsetValue;
@@ -215,12 +277,15 @@ public class JobService extends BasicService {
             SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
             sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition);
             CubeSegment newSeg = getCubeManager().appendSegment(cube, sourcePartition);
+            lockSegment(newSeg.getUuid());
             job = EngineFactory.createBatchCubingJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
+            lockSegment(newSeg.getUuid());
             job = EngineFactory.createBatchMergeJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.REFRESH) {
             CubeSegment refreshSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset);
+            lockSegment(refreshSeg.getUuid());
             job = EngineFactory.createBatchCubingJob(refreshSeg, submitter);
         } else {
             throw new JobException("invalid build type:" + buildType);
@@ -363,6 +428,8 @@ public class JobService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
     public void resumeJob(JobInstance job) throws IOException, JobException {
+        lockSegment(job.getRelatedSegment());
+
         getExecutableManager().resumeJob(job.getId());
     }
 
@@ -380,7 +447,34 @@ public class JobService extends BasicService {
             }
         }
         getExecutableManager().discardJob(job.getId());
+
+        //release the segment lock when discarded the job but the job hasn't scheduled
+        releaseSegmentLock(job.getRelatedSegment());
+
         return job;
     }
 
+    private void lockSegment(String segmentId) throws JobException {
+        if (jobLock instanceof DistributedJobLock) {
+            if (!((DistributedJobLock) jobLock).lockWithName(segmentId, getServerName())) {
+                throw new JobException("Fail to get the segment lock, the segment may be building in another job server");
+            }
+        }
+    }
+
+    private void releaseSegmentLock(String segmentId) {
+        if (jobLock instanceof DistributedJobLock) {
+            ((DistributedJobLock) jobLock).unlockWithName(segmentId);
+        }
+    }
+
+    private String getServerName() {
+        String serverName = null;
+        try {
+            serverName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            logger.error("fail to get the hostname");
+        }
+        return serverName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
new file mode 100644
index 0000000..eba7a20
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.DoWatchLock;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+/**
+ * the jobLock is specially used to support distributed scheduler.
+ */
+
+public class ZookeeperDistributedJobLock implements DistributedJobLock {
+    private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+
+    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+    private static CuratorFramework zkClient;
+    private static PathChildrenCache childrenCache;
+
+    static {
+        String zkConnectString = getZKConnectString();
+        logger.info("zk connection string:" + zkConnectString);
+        if (StringUtils.isEmpty(zkConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+        zkClient.start();
+
+        childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true);
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    childrenCache.close();
+                    zkClient.close();
+                } catch (Exception e) {
+                    logger.error("error occurred to close PathChildrenCache", e);
+                }
+            }
+        }));
+    }
+
+    /**
+     * Lock the segment with the segmentId and serverName.
+     *
+     * <p> if the segment related job want to be scheduled,
+     * it must acquire the segment lock. segmentId is used to get the lock path,
+     * serverName marked which job server keep the segment lock.
+     *
+     * @param segmentId the id of segment need to lock
+     *
+     * @param serverName the hostname of job server
+     *
+     * @return <tt>true</tt> if the segment locked successfully
+     */
+
+    @Override
+    public boolean lockWithName(String segmentId, String serverName) {
+        String lockPath = getLockPath(segmentId);
+        logger.info(serverName + " start lock the segment: " + segmentId);
+
+        boolean hasLock = false;
+        try {
+            if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) {
+                logger.error("zookeeper have not start");
+                return false;
+            }
+            if (zkClient.checkExists().forPath(lockPath) != null) {
+                if (hasLock(serverName, lockPath)) {
+                    hasLock = true;
+                    logger.info(serverName + " has kept the lock for segment: " + segmentId);
+                }
+            } else {
+                zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8")));
+                if (hasLock(serverName, lockPath)) {
+                    hasLock = true;
+                    logger.info(serverName + " lock the segment: " + segmentId + " successfully");
+                }
+            }
+        } catch (Exception e) {
+            logger.error(serverName + " error acquire lock for the segment: " + segmentId, e);
+        }
+        if (!hasLock) {
+            logger.info(serverName + " fail to acquire lock for the segment: " + segmentId);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean hasLock(String serverName, String lockPath) {
+        String lockServerName = null;
+        try {
+            if (zkClient.checkExists().forPath(lockPath) != null) {
+                byte[] data = zkClient.getData().forPath(lockPath);
+                lockServerName = new String(data, Charset.forName("UTF-8"));
+            }
+        } catch (Exception e) {
+            logger.error("fail to get the serverName for the path: " + lockPath, e);
+        }
+        return lockServerName.equalsIgnoreCase(serverName);
+    }
+
+    /**
+     * release the segment lock with the segmentId.
+     *
+     * <p> the segment related zookeeper node will be deleted.
+     *
+     * @param segmentId the name of segment need to release the lock
+     */
+
+    @Override
+    public void unlockWithName(String segmentId) {
+        String lockPath = getLockPath(segmentId);
+        try {
+            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+                if (zkClient.checkExists().forPath(lockPath) != null) {
+                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
+                    logger.info("the lock for " + segmentId + " release successfully");
+                } else {
+                    logger.info("the lock for " + segmentId + " has released");
+                }
+            }
+        } catch (Exception e) {
+            logger.error("error release lock :" + segmentId);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * watching all the locked segments related zookeeper nodes change,
+     * in order to when one job server is down, other job server can take over the running jobs.
+     *
+     * @param pool the threadPool watching the zookeeper node change
+     * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data
+     */
+
+    @Override
+    public void watchLock(ExecutorService pool, final DoWatchLock doWatch) {
+        try {
+            childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+            childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+                    switch (event.getType()) {
+                    case CHILD_REMOVED:
+                        doWatch.doWatch(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
+                        break;
+                    default:
+                        break;
+                    }
+                }
+            }, pool);
+        } catch (Exception e) {
+            logger.warn("watch the zookeeper node fail: " + e);
+        }
+    }
+
+    private static String getZKConnectString() {
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+
+    private String getLockPath(String pathName) {
+        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+    }
+
+    private static String getWatchPath() {
+        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+    }
+
+    @Override
+    public boolean lock() {
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+
+    }
+}