You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2018/06/23 23:25:14 UTC

[kylin] 01/02: KYLIN-3421 improve the fetcher runner in job scheduler

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit eddab3791b3322e8f8c42d5254e03c4f334897af
Author: Zhong <nj...@apache.org>
AuthorDate: Wed Jun 20 17:46:10 2018 +0800

    KYLIN-3421 improve the fetcher runner in job scheduler
---
 .../job/impl/threadpool/DefaultFetcherRunner.java  | 104 ++++++++++
 .../job/impl/threadpool/DefaultScheduler.java      | 210 ++-------------------
 .../job/impl/threadpool/DistributedScheduler.java  | 116 ++++--------
 .../kylin/job/impl/threadpool/FetcherRunner.java   |  77 ++++++++
 .../kylin/job/impl/threadpool/JobExecutor.java     |  25 +++
 .../job/impl/threadpool/PriorityFetcherRunner.java | 146 ++++++++++++++
 .../job/impl/threadpool/BaseSchedulerTest.java     |   2 +-
 7 files changed, 401 insertions(+), 279 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
new file mode 100644
index 0000000..e5f15fe
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.util.Map;
+
+import org.apache.kylin.common.util.SetThreadName;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultFetcherRunner extends FetcherRunner {
+
+    private static final Logger logger = LoggerFactory.getLogger(DefaultFetcherRunner.class);
+
+    public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context,
+            ExecutableManager executableManager, JobExecutor jobExecutor) {
+        super(jobEngineConfig, context, executableManager, jobExecutor);
+    }
+
+    @Override
+    synchronized public void run() {
+        try (SetThreadName ignored = new SetThreadName(//
+                "FetcherRunner %s", System.identityHashCode(this))) {//
+            // logger.debug("Job Fetcher is running...");
+            Map<String, Executable> runningJobs = context.getRunningJobs();
+            if (isJobPoolFull()) {
+                return;
+            }
+
+            int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+            for (final String id : executableManager.getAllJobIds()) {
+                if (isJobPoolFull()) {
+                    return;
+                }
+                if (runningJobs.containsKey(id)) {
+                    // logger.debug("Job id:" + id + " is already running");
+                    nRunning++;
+                    continue;
+                }
+
+                final Output output = executableManager.getOutput(id);
+                if ((output.getState() != ExecutableState.READY)) {
+                    // logger.debug("Job id:" + id + " not runnable");
+                    if (output.getState() == ExecutableState.SUCCEED) {
+                        nSUCCEED++;
+                    } else if (output.getState() == ExecutableState.ERROR) {
+                        nError++;
+                    } else if (output.getState() == ExecutableState.DISCARDED) {
+                        nDiscarded++;
+                    } else if (output.getState() == ExecutableState.STOPPED) {
+                        nStopped++;
+                    } else {
+                        if (fetchFailed) {
+                            executableManager.forceKillJob(id);
+                            nError++;
+                        } else {
+                            nOthers++;
+                        }
+                    }
+                    continue;
+                }
+
+                final AbstractExecutable executable = executableManager.getJob(id);
+                if (!executable.isReady()) {
+                    nOthers++;
+                    continue;
+                }
+
+                nReady++;
+                addToJobPool(executable, executable.getDefaultPriority());
+            }
+
+            fetchFailed = false;
+            logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, "
+                    + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError
+                    + " error, " + nDiscarded + " discarded, " + nOthers + " others");
+        } catch (Throwable th) {
+            fetchFailed = true; // this could happen when resource store is unavailable
+            logger.warn("Job Fetcher caught a exception ", th);
+        }
+    }
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 920601d..c566408 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -18,9 +18,6 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
-import java.util.Comparator;
-import java.util.Map;
-import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -31,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -40,8 +36,6 @@ import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
 import org.apache.kylin.job.execution.ExecutableManager;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.lock.JobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +78,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
     
     private JobLock jobLock;
     private ExecutableManager executableManager;
-    private Runnable fetcher;
+    private FetcherRunner fetcher;
     private ScheduledExecutorService fetcherPool;
     private ExecutorService jobPool;
     private DefaultContext context;
@@ -92,7 +86,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
     private static final Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
     private volatile boolean initialized = false;
     private volatile boolean hasStarted = false;
-    volatile boolean fetchFailed = false;
     private JobEngineConfig jobEngineConfig;
 
     public DefaultScheduler() {
@@ -101,195 +94,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
         }
     }
 
-    private class FetcherRunnerWithPriority implements Runnable {
-        volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1,
-                new Comparator<Pair<AbstractExecutable, Integer>>() {
-                    @Override
-                    public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) {
-                        return o1.getSecond() > o2.getSecond() ? -1 : 1;
-                    }
-                });
-
-        private void addToJobPool(AbstractExecutable executable, int priority) {
-            String jobDesc = executable.toString();
-            logger.info(jobDesc + " prepare to schedule and its priority is " + priority);
-            try {
-                context.addRunningJob(executable);
-                jobPool.execute(new JobRunner(executable));
-                logger.info(jobDesc + " scheduled");
-            } catch (Exception ex) {
-                context.removeRunningJob(executable);
-                logger.warn(jobDesc + " fail to schedule", ex);
-            }
-        }
-
-        @Override
-        synchronized public void run() {
-            try (SetThreadName ignored = new SetThreadName(//
-                    "Scheduler %s PriorityFetcherRunner %s"//
-                    , System.identityHashCode(DefaultScheduler.this)//
-                    , System.identityHashCode(this)//
-            )) {//
-                // logger.debug("Job Fetcher is running...");
-                Map<String, Executable> runningJobs = context.getRunningJobs();
-
-                // fetch job from jobPriorityQueue first to reduce chance to scan job list
-                Map<String, Integer> leftJobPriorities = Maps.newHashMap();
-                Pair<AbstractExecutable, Integer> executableWithPriority;
-                while ((executableWithPriority = jobPriorityQueue.peek()) != null
-                        // the priority of jobs in pendingJobPriorities should be above a threshold
-                        && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
-                    executableWithPriority = jobPriorityQueue.poll();
-                    AbstractExecutable executable = executableWithPriority.getFirst();
-                    int curPriority = executableWithPriority.getSecond();
-                    // the job should wait more than one time
-                    if (curPriority > executable.getDefaultPriority() + 1) {
-                        addToJobPool(executable, curPriority);
-                    } else {
-                        leftJobPriorities.put(executable.getId(), curPriority + 1);
-                    }
-                }
-
-                if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
-                    logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
-                    return;
-                }
-
-                while ((executableWithPriority = jobPriorityQueue.poll()) != null) {
-                    leftJobPriorities.put(executableWithPriority.getFirst().getId(),
-                            executableWithPriority.getSecond() + 1);
-                }
-
-                int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
-                for (final String id : executableManager.getAllJobIds()) {
-                    if (runningJobs.containsKey(id)) {
-                        // logger.debug("Job id:" + id + " is already running");
-                        nRunning++;
-                        continue;
-                    }
-
-                    AbstractExecutable executable = executableManager.getJob(id);
-                    if (!executable.isReady()) {
-                        final Output output = executableManager.getOutput(id);
-                        // logger.debug("Job id:" + id + " not runnable");
-                        if (output.getState() == ExecutableState.DISCARDED) {
-                            nDiscarded++;
-                        } else if (output.getState() == ExecutableState.ERROR) {
-                            nError++;
-                        } else if (output.getState() == ExecutableState.SUCCEED) {
-                            nSUCCEED++;
-                        } else if (output.getState() == ExecutableState.STOPPED) {
-                            nStopped++;
-                        } else {
-                            nOthers++;
-                        }
-                        continue;
-                    }
-
-                    nReady++;
-                    Integer priority = leftJobPriorities.get(id);
-                    if (priority == null) {
-                        priority = executable.getDefaultPriority();
-                    }
-                    jobPriorityQueue.add(new Pair<>(executable, priority));
-                }
-
-                while (runningJobs.size() < jobEngineConfig.getMaxConcurrentJobLimit()
-                        && (executableWithPriority = jobPriorityQueue.poll()) != null) {
-                    addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
-                }
-
-                logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
-                        + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " //
-                        + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers
-                        + " others");
-            } catch (Throwable th) {
-                logger.warn("Priority Job Fetcher caught a exception " + th);
-            }
-        }
-    }
-
-    private class FetcherRunner implements Runnable {
-
-        @Override
-        synchronized public void run() {
-            try (SetThreadName ignored = new SetThreadName(//
-                    "Scheduler %s FetcherRunner %s"//
-                    , System.identityHashCode(DefaultScheduler.this)//
-                    , System.identityHashCode(this)//
-            )) {//
-                // logger.debug("Job Fetcher is running...");
-                Map<String, Executable> runningJobs = context.getRunningJobs();
-                if (isJobPoolFull()) {
-                    return;
-                }
-
-                int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
-                for (final String id : executableManager.getAllJobIds()) {
-                    if (isJobPoolFull()) {
-                        return;
-                    }
-                    if (runningJobs.containsKey(id)) {
-                        // logger.debug("Job id:" + id + " is already running");
-                        nRunning++;
-                        continue;
-                    }
-                    final AbstractExecutable executable = executableManager.getJob(id);
-                    if (!executable.isReady()) {
-                        final Output output = executableManager.getOutput(id);
-                        // logger.debug("Job id:" + id + " not runnable");
-                        if (output.getState() == ExecutableState.DISCARDED) {
-                            nDiscarded++;
-                        } else if (output.getState() == ExecutableState.ERROR) {
-                            nError++;
-                        } else if (output.getState() == ExecutableState.SUCCEED) {
-                            nSUCCEED++;
-                        } else if (output.getState() == ExecutableState.STOPPED) {
-                            nStopped++;
-                        } else {
-                            if (fetchFailed) {
-                                executableManager.forceKillJob(id);
-                                nError++;
-                            } else {
-                                nOthers++;
-                            }
-                        }
-                        continue;
-                    }
-                    nReady++;
-                    String jobDesc = null;
-                    try {
-                        jobDesc = executable.toString();
-                        logger.info(jobDesc + " prepare to schedule");
-                        context.addRunningJob(executable);
-                        jobPool.execute(new JobRunner(executable));
-                        logger.info(jobDesc + " scheduled");
-                    } catch (Exception ex) {
-                        if (executable != null)
-                            context.removeRunningJob(executable);
-                        logger.warn(jobDesc + " fail to schedule", ex);
-                    }
-                }
-
-                fetchFailed = false;
-                logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, "
-                        + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError
-                        + " error, " + nDiscarded + " discarded, " + nOthers + " others");
-            } catch (Throwable th) {
-                fetchFailed = true; // this could happen when resource store is unavailable
-                logger.warn("Job Fetcher caught a exception ", th);
-            }
-        }
-    }
-
-    private boolean isJobPoolFull() {
-        Map<String, Executable> runningJobs = context.getRunningJobs();
-        if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
-            logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
-            return true;
-        }
-
-        return false;
+    public FetcherRunner getFetcherRunner() {
+        return fetcher;
     }
 
     private class JobRunner implements Runnable {
@@ -367,7 +173,15 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
         int pollSecond = jobEngineConfig.getPollIntervalSecond();
 
         logger.info("Fetching jobs every {} seconds", pollSecond);
-        fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner();
+        JobExecutor jobExecutor = new JobExecutor() {
+            @Override
+            public void execute(AbstractExecutable executable) {
+                jobPool.execute(new JobRunner(executable));
+            }
+        };
+        fetcher = jobEngineConfig.getJobPriorityConsidered()
+                ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
+                : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
         logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher));
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
         hasStarted = true;
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 055de4d..cb4d815 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -20,7 +20,6 @@ package org.apache.kylin.job.impl.threadpool;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
@@ -63,7 +62,6 @@ import com.google.common.collect.Maps;
 public class DistributedScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
     private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
     
-    private final static String SEGMENT_ID = "segmentId";
     public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata
 
     public static DistributedScheduler getInstance(KylinConfig config) {
@@ -86,57 +84,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     private DistributedLock jobLock;
     private Closeable lockWatch;
 
-    //keep all segments having running job
-    private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>();
+    //keep all running job
+    private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>();
     private volatile boolean initialized = false;
     private volatile boolean hasStarted = false;
     private JobEngineConfig jobEngineConfig;
     private String serverName;
 
-    private class FetcherRunner implements Runnable {
-        @Override
-        synchronized public void run() {
-            try {
-                Map<String, Executable> runningJobs = context.getRunningJobs();
-                if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
-                    logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
-                    return;
-                }
-
-                int nRunning = 0, nOtherRunning = 0, nReady = 0, nOthers = 0;
-                for (final String id : executableManager.getAllJobIds()) {
-                    if (runningJobs.containsKey(id)) {
-                        nRunning++;
-                        continue;
-                    }
-
-                    final Output output = executableManager.getOutput(id);
-
-                    if ((output.getState() != ExecutableState.READY)) {
-                        if (output.getState() == ExecutableState.RUNNING) {
-                            nOtherRunning++;
-                        } else {
-                            nOthers++;
-                        }
-                        continue;
-                    }
-
-                    nReady++;
-                    final AbstractExecutable executable = executableManager.getJob(id);
-                    try {
-                        jobPool.execute(new JobRunner(executable));
-                    } catch (Exception ex) {
-                        logger.warn(executable.toString() + " fail to schedule in server: " + serverName, ex);
-                    }
-                }
-                logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, "
-                        + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others");
-            } catch (Exception e) {
-                logger.warn("Job Fetcher caught a exception " + e);
-            }
-        }
-    }
-
     private class JobRunner implements Runnable {
 
         private final AbstractExecutable executable;
@@ -149,12 +103,11 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         public void run() {
             try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
                     System.identityHashCode(DistributedScheduler.this), executable.getId())) {
-                String segmentId = executable.getParam(SEGMENT_ID);
-                if (jobLock.lock(getLockPath(segmentId))) {
+                if (jobLock.lock(getLockPath(executable.getId()))) {
                     logger.info(executable.toString() + " scheduled in server: " + serverName);
 
                     context.addRunningJob(executable);
-                    segmentWithLocks.add(segmentId);
+                    jobWithLocks.add(executable.getId());
                     executable.execute(context);
                 }
             } catch (ExecuteException e) {
@@ -172,21 +125,21 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         //release job lock when job state is ready or running and the job server keep the cube lock.
         private void releaseJobLock(AbstractExecutable executable) {
             if (executable instanceof DefaultChainedExecutable) {
-                String segmentId = executable.getParam(SEGMENT_ID);
                 ExecutableState state = executable.getStatus();
 
                 if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
-                    if (segmentWithLocks.contains(segmentId)) {
-                        logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
-                        jobLock.unlock(getLockPath(segmentId));
-                        segmentWithLocks.remove(segmentId);
+                    if (jobWithLocks.contains(executable.getId())) {
+                        logger.info(
+                                executable.toString() + " will release the lock for the job: " + executable.getId());
+                        jobLock.unlock(getLockPath(executable.getId()));
+                        jobWithLocks.remove(executable.getId());
                     }
                 }
             }
         }
     }
 
-    //when the segment lock released but the segment related job still running, resume the job.
+    //when the job lock released but the related job still running, resume the job.
     private class WatcherProcessImpl implements DistributedLock.Watcher {
         private String serverName;
 
@@ -197,26 +150,21 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         @Override
         public void onUnlock(String path, String nodeData) {
             String[] paths = path.split("/");
-            String segmentId = paths[paths.length - 1];
-
-            for (final String id : executableManager.getAllJobIds()) {
-                final Output output = executableManager.getOutput(id);
-                if (output.getState() == ExecutableState.RUNNING) {
-                    AbstractExecutable executable = executableManager.getJob(id);
-                    if (executable instanceof DefaultChainedExecutable
-                            && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId)
-                            && !nodeData.equalsIgnoreCase(serverName)) {
-                        try {
-                            logger.warn(nodeData + " has released the lock for: " + segmentId
-                                    + " but the job still running. so " + serverName + " resume the job");
-                            if (!jobLock.isLocked(getLockPath(segmentId))) {
-                                executableManager.resumeRunningJobForce(executable.getId());
-                                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
-                                break;
-                            }
-                        } catch (Exception e) {
-                            logger.error("resume the job but fail in server: " + serverName, e);
+            String jobId = paths[paths.length - 1];
+
+            final Output output = executableManager.getOutput(jobId);
+            if (output.getState() == ExecutableState.RUNNING) {
+                AbstractExecutable executable = executableManager.getJob(jobId);
+                if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) {
+                    try {
+                        logger.warn(nodeData + " has released the lock for: " + jobId
+                                + " but the job still running. so " + serverName + " resume the job");
+                        if (!jobLock.isLocked(getLockPath(jobId))) {
+                            executableManager.resumeRunningJobForce(executable.getId());
+                            fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
                         }
+                    } catch (Exception e) {
+                        logger.error("resume the job but fail in server: " + serverName, e);
                     }
                 }
             }
@@ -273,7 +221,15 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
 
         int pollSecond = jobEngineConfig.getPollIntervalSecond();
         logger.info("Fetching jobs every {} seconds", pollSecond);
-        fetcher = new FetcherRunner();
+        JobExecutor jobExecutor = new JobExecutor() {
+            @Override
+            public void execute(AbstractExecutable executable) {
+                jobPool.execute(new JobRunner(executable));
+            }
+        };
+        fetcher = jobEngineConfig.getJobPriorityConsidered()
+                ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
+                : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
         hasStarted = true;
 
@@ -286,7 +242,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
             AbstractExecutable executable = executableManager.getJob(id);
             if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
                 try {
-                    if (!jobLock.isLocked(getLockPath(executable.getParam(SEGMENT_ID)))) {
+                    if (!jobLock.isLocked(getLockPath(executable.getId()))) {
                         executableManager.resumeRunningJobForce(executable.getId());
                         fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
                     }
@@ -334,8 +290,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     }
 
     private void releaseAllLocks() {
-        for (String segmentId : segmentWithLocks) {
-            jobLock.unlock(getLockPath(segmentId));
+        for (String jobId : jobWithLocks) {
+            jobLock.unlock(getLockPath(jobId));
         }
     }
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
new file mode 100644
index 0000000..d98ca33
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.util.Map;
+
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public abstract class FetcherRunner implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(FetcherRunner.class);
+
+    protected JobEngineConfig jobEngineConfig;
+    protected DefaultContext context;
+    protected ExecutableManager executableManager;
+    protected JobExecutor jobExecutor;
+    protected volatile boolean fetchFailed = false;
+
+    public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, ExecutableManager executableManager,
+            JobExecutor jobExecutor) {
+        this.jobEngineConfig = jobEngineConfig;
+        this.context = context;
+        this.executableManager = executableManager;
+        this.jobExecutor = jobExecutor;
+    }
+
+    protected boolean isJobPoolFull() {
+        Map<String, Executable> runningJobs = context.getRunningJobs();
+        if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
+            logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
+            return true;
+        }
+
+        return false;
+    }
+
+    protected void addToJobPool(AbstractExecutable executable, int priority) {
+        String jobDesc = executable.toString();
+        logger.info(jobDesc + " prepare to schedule and its priority is " + priority);
+        try {
+            context.addRunningJob(executable);
+            jobExecutor.execute(executable);
+            logger.info(jobDesc + " scheduled");
+        } catch (Exception ex) {
+            context.removeRunningJob(executable);
+            logger.warn(jobDesc + " fail to schedule", ex);
+        }
+    }
+
+    @VisibleForTesting
+    void setFetchFailed(boolean fetchFailed) {
+        this.fetchFailed = fetchFailed;
+    }
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java
new file mode 100644
index 0000000..d2efd22
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.impl.threadpool;
+
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+public interface JobExecutor {
+    void execute(AbstractExecutable executable);
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
new file mode 100644
index 0000000..b562fac
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SetThreadName;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class PriorityFetcherRunner extends FetcherRunner {
+
+    private static final Logger logger = LoggerFactory.getLogger(PriorityFetcherRunner.class);
+
+    private volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1,
+            new Comparator<Pair<AbstractExecutable, Integer>>() {
+                @Override
+                public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) {
+                    return o1.getSecond() > o2.getSecond() ? -1 : 1;
+                }
+            });
+
+    public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context,
+            ExecutableManager executableManager, JobExecutor jobExecutor) {
+        super(jobEngineConfig, context, executableManager, jobExecutor);
+    }
+
+    @Override
+    synchronized public void run() {
+        try (SetThreadName ignored = new SetThreadName(//
+                "PriorityFetcherRunner %s", System.identityHashCode(this))) {//
+            // logger.debug("Job Fetcher is running...");
+
+            // fetch job from jobPriorityQueue first to reduce chance to scan job list
+            Map<String, Integer> leftJobPriorities = Maps.newHashMap();
+            Pair<AbstractExecutable, Integer> executableWithPriority;
+            while ((executableWithPriority = jobPriorityQueue.peek()) != null
+                    // the priority of jobs in pendingJobPriorities should be above a threshold
+                    && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
+                executableWithPriority = jobPriorityQueue.poll();
+                AbstractExecutable executable = executableWithPriority.getFirst();
+                int curPriority = executableWithPriority.getSecond();
+                // the job should wait more than one time
+                if (curPriority > executable.getDefaultPriority() + 1) {
+                    addToJobPool(executable, curPriority);
+                } else {
+                    leftJobPriorities.put(executable.getId(), curPriority + 1);
+                }
+            }
+
+            Map<String, Executable> runningJobs = context.getRunningJobs();
+            if (isJobPoolFull()) {
+                return;
+            }
+
+            while ((executableWithPriority = jobPriorityQueue.poll()) != null) {
+                leftJobPriorities.put(executableWithPriority.getFirst().getId(),
+                        executableWithPriority.getSecond() + 1);
+            }
+
+            int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+            for (final String id : executableManager.getAllJobIds()) {
+                if (runningJobs.containsKey(id)) {
+                    // logger.debug("Job id:" + id + " is already running");
+                    nRunning++;
+                    continue;
+                }
+
+                final Output output = executableManager.getOutput(id);
+                if ((output.getState() != ExecutableState.READY)) {
+                    // logger.debug("Job id:" + id + " not runnable");
+                    if (output.getState() == ExecutableState.SUCCEED) {
+                        nSUCCEED++;
+                    } else if (output.getState() == ExecutableState.ERROR) {
+                        nError++;
+                    } else if (output.getState() == ExecutableState.DISCARDED) {
+                        nDiscarded++;
+                    } else if (output.getState() == ExecutableState.STOPPED) {
+                        nStopped++;
+                    } else {
+                        if (fetchFailed) {
+                            executableManager.forceKillJob(id);
+                            nError++;
+                        } else {
+                            nOthers++;
+                        }
+                    }
+                    continue;
+                }
+
+                AbstractExecutable executable = executableManager.getJob(id);
+                if (!executable.isReady()) {
+                    nOthers++;
+                    continue;
+                }
+
+                nReady++;
+                Integer priority = leftJobPriorities.get(id);
+                if (priority == null) {
+                    priority = executable.getDefaultPriority();
+                }
+                jobPriorityQueue.add(new Pair<>(executable, priority));
+            }
+
+            while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()) {
+                addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
+            }
+
+            fetchFailed = false;
+            logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
+                    + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " //
+                    + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers
+                    + " others");
+        } catch (Throwable th) {
+            fetchFailed = true; // this could happen when resource store is unavailable
+            logger.warn("Priority Job Fetcher caught a exception " + th);
+        }
+    }
+}
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index d7201f2..7c66f2c 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -129,7 +129,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
                 AbstractExecutable job = execMgr.getJob(jobId);
                 ExecutableState status = job.getStatus();
                 if (status == ExecutableState.RUNNING) {
-                    scheduler.fetchFailed = true;
+                    scheduler.getFetcherRunner().setFetchFailed(true);
                     break;
                 }
                 Thread.sleep(1000);