You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:24:06 UTC

[13/19] kylin git commit: APACHE-KYLIN-2735: Introduce an option to make job scheduler consider job priority

APACHE-KYLIN-2735: Introduce an option to make job scheduler consider job priority


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7a54e58b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7a54e58b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7a54e58b

Branch: refs/heads/master
Commit: 7a54e58bfa19092e3756b00c701e01441dd00fda
Parents: eaf0537
Author: Zhong <nj...@apache.org>
Authored: Thu Aug 31 14:56:19 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Sat Dec 2 23:43:43 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   8 ++
 .../kylin/job/engine/JobEngineConfig.java       |  14 +++
 .../kylin/job/execution/AbstractExecutable.java |   9 ++
 .../job/execution/CheckpointExecutable.java     |   7 ++
 .../job/execution/DefaultChainedExecutable.java |   7 ++
 .../job/impl/threadpool/DefaultScheduler.java   | 111 ++++++++++++++++++-
 .../org/apache/kylin/engine/mr/CubingJob.java   |  19 +++-
 7 files changed, 171 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 1a93dd4..e1a10a8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -577,6 +577,14 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.job.scheduler.default", "0"));
     }
 
+    public boolean getSchedulerPriorityConsidered() {
+        return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-considered", "false"));
+    }
+
+    public Integer getSchedulerPriorityBarFetchFromQueue() {
+        return Integer.parseInt(getOptional("kylin.job.scheduler.priority-bar-fetch-from-queue", "20"));
+    }
+
     public Integer getSchedulerPollIntervalSecond() {
         return Integer.parseInt(getOptional("kylin.job.scheduler.poll-interval-second", "30"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 6890557..9ba602f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -105,6 +105,20 @@ public class JobEngineConfig {
         return config;
     }
 
+    /**
+     * @return if consider job priority when scheduling jobs
+     * */
+    public boolean getJobPriorityConsidered() {
+        return config.getSchedulerPriorityConsidered();
+    }
+
+    /**
+     * @return the priority bar for fetching jobs from job priority queue
+     */
+    public int getFetchQueuePriorityBar() {
+        return config.getSchedulerPriorityBarFetchFromQueue();
+    }
+
     public String getHdfsWorkingDirectory() {
         return config.getHdfsWorkingDirectory();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 30b6421..a37cdc9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -44,6 +44,8 @@ import com.google.common.collect.Maps;
  */
 public abstract class AbstractExecutable implements Executable, Idempotent {
 
+    public static final Integer DEFAULT_PRIORITY = 10;
+
     protected static final String SUBMITTER = "submitter";
     protected static final String NOTIFY_LIST = "notify_list";
     protected static final String START_TIME = "startTime";
@@ -389,6 +391,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return output.getState() == ExecutableState.READY;
     }
 
+    /**
+     * The larger the value, the higher priority
+     * */
+    public int getDefaultPriority() {
+        return DEFAULT_PRIORITY;
+    }
+
     /*
     * discarded is triggered by JobService, the Scheduler is not awake of that
     *

http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
index 9864400..db477cb 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
@@ -29,6 +29,8 @@ public class CheckpointExecutable extends DefaultChainedExecutable {
 
     private static final Logger logger = LoggerFactory.getLogger(CheckpointExecutable.class);
 
+    public static final Integer DEFAULT_PRIORITY = 30;
+
     private static final String DEPLOY_ENV_NAME = "envName";
     private static final String PROJECT_INSTANCE_NAME = "projectName";
 
@@ -75,4 +77,9 @@ public class CheckpointExecutable extends DefaultChainedExecutable {
     public void setProjectName(String name) {
         setParam(PROJECT_INSTANCE_NAME, name);
     }
+
+    @Override
+    public int getDefaultPriority() {
+        return DEFAULT_PRIORITY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 54b3b0a..ff8dfee 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -31,6 +31,8 @@ import com.google.common.collect.Maps;
  */
 public class DefaultChainedExecutable extends AbstractExecutable implements ChainedExecutable {
 
+    public static final Integer DEFAULT_PRIORITY = 10;
+
     private final List<AbstractExecutable> subTasks = Lists.newArrayList();
 
     public DefaultChainedExecutable() {
@@ -167,4 +169,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
         this.subTasks.add(executable);
     }
+
+    @Override
+    public int getDefaultPriority() {
+        return DEFAULT_PRIORITY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 6ef9c81..287f215 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
+import java.util.Comparator;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -51,7 +54,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
 
     private JobLock jobLock;
     private ExecutableManager executableManager;
-    private FetcherRunner fetcher;
+    private Runnable fetcher;
     private ScheduledExecutorService fetcherPool;
     private ExecutorService jobPool;
     private DefaultContext context;
@@ -69,6 +72,110 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
         }
     }
 
+    private class FetcherRunnerWithPriority implements Runnable {
+        volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1,
+                new Comparator<Pair<AbstractExecutable, Integer>>() {
+                    @Override
+                    public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) {
+                        return o1.getSecond() > o2.getSecond() ? -1 : 1;
+                    }
+                });
+
+        private void addToJobPool(AbstractExecutable executable, int priority) {
+            String jobDesc = executable.toString();
+            logger.info(jobDesc + " prepare to schedule and its priority is " + priority);
+            try {
+                context.addRunningJob(executable);
+                jobPool.execute(new JobRunner(executable));
+                logger.info(jobDesc + " scheduled");
+            } catch (Exception ex) {
+                context.removeRunningJob(executable);
+                logger.warn(jobDesc + " fail to schedule", ex);
+            }
+        }
+
+        @Override
+        synchronized public void run() {
+            try {
+                // logger.debug("Job Fetcher is running...");
+                Map<String, Executable> runningJobs = context.getRunningJobs();
+
+                // fetch job from jobPriorityQueue first to reduce chance to scan job list
+                Map<String, Integer> leftJobPriorities = Maps.newHashMap();
+                Pair<AbstractExecutable, Integer> executableWithPriority;
+                while ((executableWithPriority = jobPriorityQueue.peek()) != null
+                        // the priority of jobs in pendingJobPriorities should be above a threshold
+                        && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
+                    executableWithPriority = jobPriorityQueue.poll();
+                    AbstractExecutable executable = executableWithPriority.getFirst();
+                    int curPriority = executableWithPriority.getSecond();
+                    // the job should wait more than one time
+                    if (curPriority > executable.getDefaultPriority() + 1) {
+                        addToJobPool(executable, curPriority);
+                    } else {
+                        leftJobPriorities.put(executable.getId(), curPriority + 1);
+                    }
+                }
+
+                if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
+                    logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
+                    return;
+                }
+
+                while ((executableWithPriority = jobPriorityQueue.poll()) != null) {
+                    leftJobPriorities.put(executableWithPriority.getFirst().getId(),
+                            executableWithPriority.getSecond() + 1);
+                }
+
+                int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+                for (final String id : executableManager.getAllJobIds()) {
+                    if (runningJobs.containsKey(id)) {
+                        // logger.debug("Job id:" + id + " is already running");
+                        nRunning++;
+                        continue;
+                    }
+
+                    AbstractExecutable executable = executableManager.getJob(id);
+                    if (!executable.isReady()) {
+                        final Output output = executableManager.getOutput(id);
+                        // logger.debug("Job id:" + id + " not runnable");
+                        if (output.getState() == ExecutableState.DISCARDED) {
+                            nDiscarded++;
+                        } else if (output.getState() == ExecutableState.ERROR) {
+                            nError++;
+                        } else if (output.getState() == ExecutableState.SUCCEED) {
+                            nSUCCEED++;
+                        } else if (output.getState() == ExecutableState.STOPPED) {
+                            nStopped++;
+                        } else {
+                            nOthers++;
+                        }
+                        continue;
+                    }
+
+                    nReady++;
+                    Integer priority = leftJobPriorities.get(id);
+                    if (priority == null) {
+                        priority = executable.getDefaultPriority();
+                    }
+                    jobPriorityQueue.add(new Pair<>(executable, priority));
+                }
+
+                while (runningJobs.size() < jobEngineConfig.getMaxConcurrentJobLimit()
+                        && (executableWithPriority = jobPriorityQueue.poll()) != null) {
+                    addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
+                }
+
+                logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
+                        + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " //
+                        + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers
+                        + " others");
+            } catch (Exception e) {
+                logger.warn("Job Fetcher caught a exception " + e);
+            }
+        }
+    }
+
     private class FetcherRunner implements Runnable {
 
         @Override
@@ -222,7 +329,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
 
         int pollSecond = jobEngineConfig.getPollIntervalSecond();
         logger.info("Fetching jobs every {} seconds", pollSecond);
-        fetcher = new FetcherRunner();
+        fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner();
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
         hasStarted = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 71b62a0..abf7e02 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -65,12 +65,18 @@ public class CubingJob extends DefaultChainedExecutable {
     }
 
     public enum CubingJobTypeEnum {
-        BUILD("BUILD"), OPTIMIZE("OPTIMIZE"), MERGE("MERGE");
+        BUILD("BUILD", 20), OPTIMIZE("OPTIMIZE", 5), MERGE("MERGE", 25);
 
         private final String name;
+        private final int defaultPriority;
 
-        CubingJobTypeEnum(String name) {
+        CubingJobTypeEnum(String name, int priority) {
             this.name = name;
+            this.defaultPriority = priority;
+        }
+
+        public int getDefaultPriority() {
+            return defaultPriority;
         }
 
         public String toString() {
@@ -151,6 +157,15 @@ public class CubingJob extends DefaultChainedExecutable {
         super();
     }
 
+    @Override
+    public int getDefaultPriority() {
+        CubingJobTypeEnum jobType = CubingJobTypeEnum.getByName(getJobType());
+        if (jobType == null) {
+            return super.getDefaultPriority();
+        }
+        return jobType.getDefaultPriority();
+    }
+
     protected void setDeployEnvName(String name) {
         setParam(DEPLOY_ENV_NAME, name);
     }