You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:24:06 UTC
[13/19] kylin git commit: APACHE-KYLIN-2735: Introduce an option to
make job scheduler consider job priority
APACHE-KYLIN-2735: Introduce an option to make job scheduler consider job priority
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7a54e58b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7a54e58b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7a54e58b
Branch: refs/heads/master
Commit: 7a54e58bfa19092e3756b00c701e01441dd00fda
Parents: eaf0537
Author: Zhong <nj...@apache.org>
Authored: Thu Aug 31 14:56:19 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Sat Dec 2 23:43:43 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 8 ++
.../kylin/job/engine/JobEngineConfig.java | 14 +++
.../kylin/job/execution/AbstractExecutable.java | 9 ++
.../job/execution/CheckpointExecutable.java | 7 ++
.../job/execution/DefaultChainedExecutable.java | 7 ++
.../job/impl/threadpool/DefaultScheduler.java | 111 ++++++++++++++++++-
.../org/apache/kylin/engine/mr/CubingJob.java | 19 +++-
7 files changed, 171 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 1a93dd4..e1a10a8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -577,6 +577,14 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.job.scheduler.default", "0"));
}
+ public boolean getSchedulerPriorityConsidered() {
+ return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-considered", "false"));
+ }
+
+ public Integer getSchedulerPriorityBarFetchFromQueue() {
+ return Integer.parseInt(getOptional("kylin.job.scheduler.priority-bar-fetch-from-queue", "20"));
+ }
+
public Integer getSchedulerPollIntervalSecond() {
return Integer.parseInt(getOptional("kylin.job.scheduler.poll-interval-second", "30"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 6890557..9ba602f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -105,6 +105,20 @@ public class JobEngineConfig {
return config;
}
+ /**
+ * @return if consider job priority when scheduling jobs
+ * */
+ public boolean getJobPriorityConsidered() {
+ return config.getSchedulerPriorityConsidered();
+ }
+
+ /**
+ * @return the priority bar for fetching jobs from job priority queue
+ */
+ public int getFetchQueuePriorityBar() {
+ return config.getSchedulerPriorityBarFetchFromQueue();
+ }
+
public String getHdfsWorkingDirectory() {
return config.getHdfsWorkingDirectory();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 30b6421..a37cdc9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -44,6 +44,8 @@ import com.google.common.collect.Maps;
*/
public abstract class AbstractExecutable implements Executable, Idempotent {
+ public static final Integer DEFAULT_PRIORITY = 10;
+
protected static final String SUBMITTER = "submitter";
protected static final String NOTIFY_LIST = "notify_list";
protected static final String START_TIME = "startTime";
@@ -389,6 +391,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return output.getState() == ExecutableState.READY;
}
+ /**
+ * The larger the value, the higher priority
+ * */
+ public int getDefaultPriority() {
+ return DEFAULT_PRIORITY;
+ }
+
/*
* discarded is triggered by JobService, the Scheduler is not awake of that
*
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
index 9864400..db477cb 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/CheckpointExecutable.java
@@ -29,6 +29,8 @@ public class CheckpointExecutable extends DefaultChainedExecutable {
private static final Logger logger = LoggerFactory.getLogger(CheckpointExecutable.class);
+ public static final Integer DEFAULT_PRIORITY = 30;
+
private static final String DEPLOY_ENV_NAME = "envName";
private static final String PROJECT_INSTANCE_NAME = "projectName";
@@ -75,4 +77,9 @@ public class CheckpointExecutable extends DefaultChainedExecutable {
public void setProjectName(String name) {
setParam(PROJECT_INSTANCE_NAME, name);
}
+
+ @Override
+ public int getDefaultPriority() {
+ return DEFAULT_PRIORITY;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 54b3b0a..ff8dfee 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -31,6 +31,8 @@ import com.google.common.collect.Maps;
*/
public class DefaultChainedExecutable extends AbstractExecutable implements ChainedExecutable {
+ public static final Integer DEFAULT_PRIORITY = 10;
+
private final List<AbstractExecutable> subTasks = Lists.newArrayList();
public DefaultChainedExecutable() {
@@ -167,4 +169,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
this.subTasks.add(executable);
}
+
+ @Override
+ public int getDefaultPriority() {
+ return DEFAULT_PRIORITY;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 6ef9c81..287f215 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -18,7 +18,9 @@
package org.apache.kylin.job.impl.threadpool;
+import java.util.Comparator;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -51,7 +54,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
private JobLock jobLock;
private ExecutableManager executableManager;
- private FetcherRunner fetcher;
+ private Runnable fetcher;
private ScheduledExecutorService fetcherPool;
private ExecutorService jobPool;
private DefaultContext context;
@@ -69,6 +72,110 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
}
}
+ private class FetcherRunnerWithPriority implements Runnable {
+ volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1,
+ new Comparator<Pair<AbstractExecutable, Integer>>() {
+ @Override
+ public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) {
+ return o1.getSecond() > o2.getSecond() ? -1 : 1;
+ }
+ });
+
+ private void addToJobPool(AbstractExecutable executable, int priority) {
+ String jobDesc = executable.toString();
+ logger.info(jobDesc + " prepare to schedule and its priority is " + priority);
+ try {
+ context.addRunningJob(executable);
+ jobPool.execute(new JobRunner(executable));
+ logger.info(jobDesc + " scheduled");
+ } catch (Exception ex) {
+ context.removeRunningJob(executable);
+ logger.warn(jobDesc + " fail to schedule", ex);
+ }
+ }
+
+ @Override
+ synchronized public void run() {
+ try {
+ // logger.debug("Job Fetcher is running...");
+ Map<String, Executable> runningJobs = context.getRunningJobs();
+
+ // fetch job from jobPriorityQueue first to reduce chance to scan job list
+ Map<String, Integer> leftJobPriorities = Maps.newHashMap();
+ Pair<AbstractExecutable, Integer> executableWithPriority;
+ while ((executableWithPriority = jobPriorityQueue.peek()) != null
+ // the priority of jobs in pendingJobPriorities should be above a threshold
+ && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
+ executableWithPriority = jobPriorityQueue.poll();
+ AbstractExecutable executable = executableWithPriority.getFirst();
+ int curPriority = executableWithPriority.getSecond();
+ // the job should wait more than one time
+ if (curPriority > executable.getDefaultPriority() + 1) {
+ addToJobPool(executable, curPriority);
+ } else {
+ leftJobPriorities.put(executable.getId(), curPriority + 1);
+ }
+ }
+
+ if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
+ logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
+ return;
+ }
+
+ while ((executableWithPriority = jobPriorityQueue.poll()) != null) {
+ leftJobPriorities.put(executableWithPriority.getFirst().getId(),
+ executableWithPriority.getSecond() + 1);
+ }
+
+ int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+ for (final String id : executableManager.getAllJobIds()) {
+ if (runningJobs.containsKey(id)) {
+ // logger.debug("Job id:" + id + " is already running");
+ nRunning++;
+ continue;
+ }
+
+ AbstractExecutable executable = executableManager.getJob(id);
+ if (!executable.isReady()) {
+ final Output output = executableManager.getOutput(id);
+ // logger.debug("Job id:" + id + " not runnable");
+ if (output.getState() == ExecutableState.DISCARDED) {
+ nDiscarded++;
+ } else if (output.getState() == ExecutableState.ERROR) {
+ nError++;
+ } else if (output.getState() == ExecutableState.SUCCEED) {
+ nSUCCEED++;
+ } else if (output.getState() == ExecutableState.STOPPED) {
+ nStopped++;
+ } else {
+ nOthers++;
+ }
+ continue;
+ }
+
+ nReady++;
+ Integer priority = leftJobPriorities.get(id);
+ if (priority == null) {
+ priority = executable.getDefaultPriority();
+ }
+ jobPriorityQueue.add(new Pair<>(executable, priority));
+ }
+
+ while (runningJobs.size() < jobEngineConfig.getMaxConcurrentJobLimit()
+ && (executableWithPriority = jobPriorityQueue.poll()) != null) {
+ addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
+ }
+
+ logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
+ + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " //
+ + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers
+ + " others");
+ } catch (Exception e) {
+ logger.warn("Job Fetcher caught a exception " + e);
+ }
+ }
+ }
+
private class FetcherRunner implements Runnable {
@Override
@@ -222,7 +329,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
int pollSecond = jobEngineConfig.getPollIntervalSecond();
logger.info("Fetching jobs every {} seconds", pollSecond);
- fetcher = new FetcherRunner();
+ fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner();
fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
hasStarted = true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7a54e58b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 71b62a0..abf7e02 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -65,12 +65,18 @@ public class CubingJob extends DefaultChainedExecutable {
}
public enum CubingJobTypeEnum {
- BUILD("BUILD"), OPTIMIZE("OPTIMIZE"), MERGE("MERGE");
+ BUILD("BUILD", 20), OPTIMIZE("OPTIMIZE", 5), MERGE("MERGE", 25);
private final String name;
+ private final int defaultPriority;
- CubingJobTypeEnum(String name) {
+ CubingJobTypeEnum(String name, int priority) {
this.name = name;
+ this.defaultPriority = priority;
+ }
+
+ public int getDefaultPriority() {
+ return defaultPriority;
}
public String toString() {
@@ -151,6 +157,15 @@ public class CubingJob extends DefaultChainedExecutable {
super();
}
+ @Override
+ public int getDefaultPriority() {
+ CubingJobTypeEnum jobType = CubingJobTypeEnum.getByName(getJobType());
+ if (jobType == null) {
+ return super.getDefaultPriority();
+ }
+ return jobType.getDefaultPriority();
+ }
+
protected void setDeployEnvName(String name) {
setParam(DEPLOY_ENV_NAME, name);
}