You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2018/06/23 23:25:14 UTC
[kylin] 01/02: KYLIN-3421 improve the fetcher runner in job
scheduler
This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit eddab3791b3322e8f8c42d5254e03c4f334897af
Author: Zhong <nj...@apache.org>
AuthorDate: Wed Jun 20 17:46:10 2018 +0800
KYLIN-3421 improve the fetcher runner in job scheduler
---
.../job/impl/threadpool/DefaultFetcherRunner.java | 104 ++++++++++
.../job/impl/threadpool/DefaultScheduler.java | 210 ++-------------------
.../job/impl/threadpool/DistributedScheduler.java | 116 ++++--------
.../kylin/job/impl/threadpool/FetcherRunner.java | 77 ++++++++
.../kylin/job/impl/threadpool/JobExecutor.java | 25 +++
.../job/impl/threadpool/PriorityFetcherRunner.java | 146 ++++++++++++++
.../job/impl/threadpool/BaseSchedulerTest.java | 2 +-
7 files changed, 401 insertions(+), 279 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
new file mode 100644
index 0000000..e5f15fe
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.util.Map;
+
+import org.apache.kylin.common.util.SetThreadName;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultFetcherRunner extends FetcherRunner {
+
+ private static final Logger logger = LoggerFactory.getLogger(DefaultFetcherRunner.class);
+
+ public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context,
+ ExecutableManager executableManager, JobExecutor jobExecutor) {
+ super(jobEngineConfig, context, executableManager, jobExecutor);
+ }
+
+ @Override
+ synchronized public void run() {
+ try (SetThreadName ignored = new SetThreadName(//
+ "FetcherRunner %s", System.identityHashCode(this))) {//
+ // logger.debug("Job Fetcher is running...");
+ Map<String, Executable> runningJobs = context.getRunningJobs();
+ if (isJobPoolFull()) {
+ return;
+ }
+
+ int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+ for (final String id : executableManager.getAllJobIds()) {
+ if (isJobPoolFull()) {
+ return;
+ }
+ if (runningJobs.containsKey(id)) {
+ // logger.debug("Job id:" + id + " is already running");
+ nRunning++;
+ continue;
+ }
+
+ final Output output = executableManager.getOutput(id);
+ if ((output.getState() != ExecutableState.READY)) {
+ // logger.debug("Job id:" + id + " not runnable");
+ if (output.getState() == ExecutableState.SUCCEED) {
+ nSUCCEED++;
+ } else if (output.getState() == ExecutableState.ERROR) {
+ nError++;
+ } else if (output.getState() == ExecutableState.DISCARDED) {
+ nDiscarded++;
+ } else if (output.getState() == ExecutableState.STOPPED) {
+ nStopped++;
+ } else {
+ if (fetchFailed) {
+ executableManager.forceKillJob(id);
+ nError++;
+ } else {
+ nOthers++;
+ }
+ }
+ continue;
+ }
+
+ final AbstractExecutable executable = executableManager.getJob(id);
+ if (!executable.isReady()) {
+ nOthers++;
+ continue;
+ }
+
+ nReady++;
+ addToJobPool(executable, executable.getDefaultPriority());
+ }
+
+ fetchFailed = false;
+ logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, "
+ + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError
+ + " error, " + nDiscarded + " discarded, " + nOthers + " others");
+ } catch (Throwable th) {
+ fetchFailed = true; // this could happen when resource store is unavailable
+ logger.warn("Job Fetcher caught a exception ", th);
+ }
+ }
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 920601d..c566408 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -18,9 +18,6 @@
package org.apache.kylin.job.impl.threadpool;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -31,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -40,8 +36,6 @@ import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +78,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
private JobLock jobLock;
private ExecutableManager executableManager;
- private Runnable fetcher;
+ private FetcherRunner fetcher;
private ScheduledExecutorService fetcherPool;
private ExecutorService jobPool;
private DefaultContext context;
@@ -92,7 +86,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
private static final Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
private volatile boolean initialized = false;
private volatile boolean hasStarted = false;
- volatile boolean fetchFailed = false;
private JobEngineConfig jobEngineConfig;
public DefaultScheduler() {
@@ -101,195 +94,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
}
}
- private class FetcherRunnerWithPriority implements Runnable {
- volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1,
- new Comparator<Pair<AbstractExecutable, Integer>>() {
- @Override
- public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) {
- return o1.getSecond() > o2.getSecond() ? -1 : 1;
- }
- });
-
- private void addToJobPool(AbstractExecutable executable, int priority) {
- String jobDesc = executable.toString();
- logger.info(jobDesc + " prepare to schedule and its priority is " + priority);
- try {
- context.addRunningJob(executable);
- jobPool.execute(new JobRunner(executable));
- logger.info(jobDesc + " scheduled");
- } catch (Exception ex) {
- context.removeRunningJob(executable);
- logger.warn(jobDesc + " fail to schedule", ex);
- }
- }
-
- @Override
- synchronized public void run() {
- try (SetThreadName ignored = new SetThreadName(//
- "Scheduler %s PriorityFetcherRunner %s"//
- , System.identityHashCode(DefaultScheduler.this)//
- , System.identityHashCode(this)//
- )) {//
- // logger.debug("Job Fetcher is running...");
- Map<String, Executable> runningJobs = context.getRunningJobs();
-
- // fetch job from jobPriorityQueue first to reduce chance to scan job list
- Map<String, Integer> leftJobPriorities = Maps.newHashMap();
- Pair<AbstractExecutable, Integer> executableWithPriority;
- while ((executableWithPriority = jobPriorityQueue.peek()) != null
- // the priority of jobs in pendingJobPriorities should be above a threshold
- && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
- executableWithPriority = jobPriorityQueue.poll();
- AbstractExecutable executable = executableWithPriority.getFirst();
- int curPriority = executableWithPriority.getSecond();
- // the job should wait more than one time
- if (curPriority > executable.getDefaultPriority() + 1) {
- addToJobPool(executable, curPriority);
- } else {
- leftJobPriorities.put(executable.getId(), curPriority + 1);
- }
- }
-
- if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
- logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
- return;
- }
-
- while ((executableWithPriority = jobPriorityQueue.poll()) != null) {
- leftJobPriorities.put(executableWithPriority.getFirst().getId(),
- executableWithPriority.getSecond() + 1);
- }
-
- int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
- for (final String id : executableManager.getAllJobIds()) {
- if (runningJobs.containsKey(id)) {
- // logger.debug("Job id:" + id + " is already running");
- nRunning++;
- continue;
- }
-
- AbstractExecutable executable = executableManager.getJob(id);
- if (!executable.isReady()) {
- final Output output = executableManager.getOutput(id);
- // logger.debug("Job id:" + id + " not runnable");
- if (output.getState() == ExecutableState.DISCARDED) {
- nDiscarded++;
- } else if (output.getState() == ExecutableState.ERROR) {
- nError++;
- } else if (output.getState() == ExecutableState.SUCCEED) {
- nSUCCEED++;
- } else if (output.getState() == ExecutableState.STOPPED) {
- nStopped++;
- } else {
- nOthers++;
- }
- continue;
- }
-
- nReady++;
- Integer priority = leftJobPriorities.get(id);
- if (priority == null) {
- priority = executable.getDefaultPriority();
- }
- jobPriorityQueue.add(new Pair<>(executable, priority));
- }
-
- while (runningJobs.size() < jobEngineConfig.getMaxConcurrentJobLimit()
- && (executableWithPriority = jobPriorityQueue.poll()) != null) {
- addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
- }
-
- logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
- + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " //
- + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers
- + " others");
- } catch (Throwable th) {
- logger.warn("Priority Job Fetcher caught a exception " + th);
- }
- }
- }
-
- private class FetcherRunner implements Runnable {
-
- @Override
- synchronized public void run() {
- try (SetThreadName ignored = new SetThreadName(//
- "Scheduler %s FetcherRunner %s"//
- , System.identityHashCode(DefaultScheduler.this)//
- , System.identityHashCode(this)//
- )) {//
- // logger.debug("Job Fetcher is running...");
- Map<String, Executable> runningJobs = context.getRunningJobs();
- if (isJobPoolFull()) {
- return;
- }
-
- int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
- for (final String id : executableManager.getAllJobIds()) {
- if (isJobPoolFull()) {
- return;
- }
- if (runningJobs.containsKey(id)) {
- // logger.debug("Job id:" + id + " is already running");
- nRunning++;
- continue;
- }
- final AbstractExecutable executable = executableManager.getJob(id);
- if (!executable.isReady()) {
- final Output output = executableManager.getOutput(id);
- // logger.debug("Job id:" + id + " not runnable");
- if (output.getState() == ExecutableState.DISCARDED) {
- nDiscarded++;
- } else if (output.getState() == ExecutableState.ERROR) {
- nError++;
- } else if (output.getState() == ExecutableState.SUCCEED) {
- nSUCCEED++;
- } else if (output.getState() == ExecutableState.STOPPED) {
- nStopped++;
- } else {
- if (fetchFailed) {
- executableManager.forceKillJob(id);
- nError++;
- } else {
- nOthers++;
- }
- }
- continue;
- }
- nReady++;
- String jobDesc = null;
- try {
- jobDesc = executable.toString();
- logger.info(jobDesc + " prepare to schedule");
- context.addRunningJob(executable);
- jobPool.execute(new JobRunner(executable));
- logger.info(jobDesc + " scheduled");
- } catch (Exception ex) {
- if (executable != null)
- context.removeRunningJob(executable);
- logger.warn(jobDesc + " fail to schedule", ex);
- }
- }
-
- fetchFailed = false;
- logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, "
- + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError
- + " error, " + nDiscarded + " discarded, " + nOthers + " others");
- } catch (Throwable th) {
- fetchFailed = true; // this could happen when resource store is unavailable
- logger.warn("Job Fetcher caught a exception ", th);
- }
- }
- }
-
- private boolean isJobPoolFull() {
- Map<String, Executable> runningJobs = context.getRunningJobs();
- if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
- logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
- return true;
- }
-
- return false;
+ public FetcherRunner getFetcherRunner() {
+ return fetcher;
}
private class JobRunner implements Runnable {
@@ -367,7 +173,15 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
int pollSecond = jobEngineConfig.getPollIntervalSecond();
logger.info("Fetching jobs every {} seconds", pollSecond);
- fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner();
+ JobExecutor jobExecutor = new JobExecutor() {
+ @Override
+ public void execute(AbstractExecutable executable) {
+ jobPool.execute(new JobRunner(executable));
+ }
+ };
+ fetcher = jobEngineConfig.getJobPriorityConsidered()
+ ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
+ : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher));
fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
hasStarted = true;
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 055de4d..cb4d815 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -20,7 +20,6 @@ package org.apache.kylin.job.impl.threadpool;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
@@ -63,7 +62,6 @@ import com.google.common.collect.Maps;
public class DistributedScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
- private final static String SEGMENT_ID = "segmentId";
public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata
public static DistributedScheduler getInstance(KylinConfig config) {
@@ -86,57 +84,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
private DistributedLock jobLock;
private Closeable lockWatch;
- //keep all segments having running job
- private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>();
+ //keep all running job
+ private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>();
private volatile boolean initialized = false;
private volatile boolean hasStarted = false;
private JobEngineConfig jobEngineConfig;
private String serverName;
- private class FetcherRunner implements Runnable {
- @Override
- synchronized public void run() {
- try {
- Map<String, Executable> runningJobs = context.getRunningJobs();
- if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
- logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
- return;
- }
-
- int nRunning = 0, nOtherRunning = 0, nReady = 0, nOthers = 0;
- for (final String id : executableManager.getAllJobIds()) {
- if (runningJobs.containsKey(id)) {
- nRunning++;
- continue;
- }
-
- final Output output = executableManager.getOutput(id);
-
- if ((output.getState() != ExecutableState.READY)) {
- if (output.getState() == ExecutableState.RUNNING) {
- nOtherRunning++;
- } else {
- nOthers++;
- }
- continue;
- }
-
- nReady++;
- final AbstractExecutable executable = executableManager.getJob(id);
- try {
- jobPool.execute(new JobRunner(executable));
- } catch (Exception ex) {
- logger.warn(executable.toString() + " fail to schedule in server: " + serverName, ex);
- }
- }
- logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, "
- + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others");
- } catch (Exception e) {
- logger.warn("Job Fetcher caught a exception " + e);
- }
- }
- }
-
private class JobRunner implements Runnable {
private final AbstractExecutable executable;
@@ -149,12 +103,11 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
public void run() {
try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
System.identityHashCode(DistributedScheduler.this), executable.getId())) {
- String segmentId = executable.getParam(SEGMENT_ID);
- if (jobLock.lock(getLockPath(segmentId))) {
+ if (jobLock.lock(getLockPath(executable.getId()))) {
logger.info(executable.toString() + " scheduled in server: " + serverName);
context.addRunningJob(executable);
- segmentWithLocks.add(segmentId);
+ jobWithLocks.add(executable.getId());
executable.execute(context);
}
} catch (ExecuteException e) {
@@ -172,21 +125,21 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
//release job lock when job state is ready or running and the job server keep the cube lock.
private void releaseJobLock(AbstractExecutable executable) {
if (executable instanceof DefaultChainedExecutable) {
- String segmentId = executable.getParam(SEGMENT_ID);
ExecutableState state = executable.getStatus();
if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
- if (segmentWithLocks.contains(segmentId)) {
- logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
- jobLock.unlock(getLockPath(segmentId));
- segmentWithLocks.remove(segmentId);
+ if (jobWithLocks.contains(executable.getId())) {
+ logger.info(
+ executable.toString() + " will release the lock for the job: " + executable.getId());
+ jobLock.unlock(getLockPath(executable.getId()));
+ jobWithLocks.remove(executable.getId());
}
}
}
}
}
- //when the segment lock released but the segment related job still running, resume the job.
+ //when the job lock released but the related job still running, resume the job.
private class WatcherProcessImpl implements DistributedLock.Watcher {
private String serverName;
@@ -197,26 +150,21 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
@Override
public void onUnlock(String path, String nodeData) {
String[] paths = path.split("/");
- String segmentId = paths[paths.length - 1];
-
- for (final String id : executableManager.getAllJobIds()) {
- final Output output = executableManager.getOutput(id);
- if (output.getState() == ExecutableState.RUNNING) {
- AbstractExecutable executable = executableManager.getJob(id);
- if (executable instanceof DefaultChainedExecutable
- && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId)
- && !nodeData.equalsIgnoreCase(serverName)) {
- try {
- logger.warn(nodeData + " has released the lock for: " + segmentId
- + " but the job still running. so " + serverName + " resume the job");
- if (!jobLock.isLocked(getLockPath(segmentId))) {
- executableManager.resumeRunningJobForce(executable.getId());
- fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
- break;
- }
- } catch (Exception e) {
- logger.error("resume the job but fail in server: " + serverName, e);
+ String jobId = paths[paths.length - 1];
+
+ final Output output = executableManager.getOutput(jobId);
+ if (output.getState() == ExecutableState.RUNNING) {
+ AbstractExecutable executable = executableManager.getJob(jobId);
+ if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) {
+ try {
+ logger.warn(nodeData + " has released the lock for: " + jobId
+ + " but the job still running. so " + serverName + " resume the job");
+ if (!jobLock.isLocked(getLockPath(jobId))) {
+ executableManager.resumeRunningJobForce(executable.getId());
+ fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
}
+ } catch (Exception e) {
+ logger.error("resume the job but fail in server: " + serverName, e);
}
}
}
@@ -273,7 +221,15 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
int pollSecond = jobEngineConfig.getPollIntervalSecond();
logger.info("Fetching jobs every {} seconds", pollSecond);
- fetcher = new FetcherRunner();
+ JobExecutor jobExecutor = new JobExecutor() {
+ @Override
+ public void execute(AbstractExecutable executable) {
+ jobPool.execute(new JobRunner(executable));
+ }
+ };
+ fetcher = jobEngineConfig.getJobPriorityConsidered()
+ ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
+ : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
hasStarted = true;
@@ -286,7 +242,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
AbstractExecutable executable = executableManager.getJob(id);
if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
try {
- if (!jobLock.isLocked(getLockPath(executable.getParam(SEGMENT_ID)))) {
+ if (!jobLock.isLocked(getLockPath(executable.getId()))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
}
@@ -334,8 +290,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
private void releaseAllLocks() {
- for (String segmentId : segmentWithLocks) {
- jobLock.unlock(getLockPath(segmentId));
+ for (String jobId : jobWithLocks) {
+ jobLock.unlock(getLockPath(jobId));
}
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
new file mode 100644
index 0000000..d98ca33
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.util.Map;
+
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public abstract class FetcherRunner implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(FetcherRunner.class);
+
+ protected JobEngineConfig jobEngineConfig;
+ protected DefaultContext context;
+ protected ExecutableManager executableManager;
+ protected JobExecutor jobExecutor;
+ protected volatile boolean fetchFailed = false;
+
+ public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, ExecutableManager executableManager,
+ JobExecutor jobExecutor) {
+ this.jobEngineConfig = jobEngineConfig;
+ this.context = context;
+ this.executableManager = executableManager;
+ this.jobExecutor = jobExecutor;
+ }
+
+ protected boolean isJobPoolFull() {
+ Map<String, Executable> runningJobs = context.getRunningJobs();
+ if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
+ logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
+ return true;
+ }
+
+ return false;
+ }
+
+ protected void addToJobPool(AbstractExecutable executable, int priority) {
+ String jobDesc = executable.toString();
+ logger.info(jobDesc + " prepare to schedule and its priority is " + priority);
+ try {
+ context.addRunningJob(executable);
+ jobExecutor.execute(executable);
+ logger.info(jobDesc + " scheduled");
+ } catch (Exception ex) {
+ context.removeRunningJob(executable);
+ logger.warn(jobDesc + " fail to schedule", ex);
+ }
+ }
+
+ @VisibleForTesting
+ void setFetchFailed(boolean fetchFailed) {
+ this.fetchFailed = fetchFailed;
+ }
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java
new file mode 100644
index 0000000..d2efd22
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.impl.threadpool;
+
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+public interface JobExecutor {
+ void execute(AbstractExecutable executable);
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
new file mode 100644
index 0000000..b562fac
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SetThreadName;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class PriorityFetcherRunner extends FetcherRunner {
+
+ private static final Logger logger = LoggerFactory.getLogger(PriorityFetcherRunner.class);
+
+ private volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1,
+ new Comparator<Pair<AbstractExecutable, Integer>>() {
+ @Override
+ public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) {
+ return o1.getSecond() > o2.getSecond() ? -1 : 1;
+ }
+ });
+
+ public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context,
+ ExecutableManager executableManager, JobExecutor jobExecutor) {
+ super(jobEngineConfig, context, executableManager, jobExecutor);
+ }
+
+ @Override
+ synchronized public void run() {
+ try (SetThreadName ignored = new SetThreadName(//
+ "PriorityFetcherRunner %s", System.identityHashCode(this))) {//
+ // logger.debug("Job Fetcher is running...");
+
+ // fetch job from jobPriorityQueue first to reduce chance to scan job list
+ Map<String, Integer> leftJobPriorities = Maps.newHashMap();
+ Pair<AbstractExecutable, Integer> executableWithPriority;
+ while ((executableWithPriority = jobPriorityQueue.peek()) != null
+ // the priority of jobs in pendingJobPriorities should be above a threshold
+ && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
+ executableWithPriority = jobPriorityQueue.poll();
+ AbstractExecutable executable = executableWithPriority.getFirst();
+ int curPriority = executableWithPriority.getSecond();
+ // the job should wait more than one time
+ if (curPriority > executable.getDefaultPriority() + 1) {
+ addToJobPool(executable, curPriority);
+ } else {
+ leftJobPriorities.put(executable.getId(), curPriority + 1);
+ }
+ }
+
+ Map<String, Executable> runningJobs = context.getRunningJobs();
+ if (isJobPoolFull()) {
+ return;
+ }
+
+ while ((executableWithPriority = jobPriorityQueue.poll()) != null) {
+ leftJobPriorities.put(executableWithPriority.getFirst().getId(),
+ executableWithPriority.getSecond() + 1);
+ }
+
+ int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
+ for (final String id : executableManager.getAllJobIds()) {
+ if (runningJobs.containsKey(id)) {
+ // logger.debug("Job id:" + id + " is already running");
+ nRunning++;
+ continue;
+ }
+
+ final Output output = executableManager.getOutput(id);
+ if ((output.getState() != ExecutableState.READY)) {
+ // logger.debug("Job id:" + id + " not runnable");
+ if (output.getState() == ExecutableState.SUCCEED) {
+ nSUCCEED++;
+ } else if (output.getState() == ExecutableState.ERROR) {
+ nError++;
+ } else if (output.getState() == ExecutableState.DISCARDED) {
+ nDiscarded++;
+ } else if (output.getState() == ExecutableState.STOPPED) {
+ nStopped++;
+ } else {
+ if (fetchFailed) {
+ executableManager.forceKillJob(id);
+ nError++;
+ } else {
+ nOthers++;
+ }
+ }
+ continue;
+ }
+
+ AbstractExecutable executable = executableManager.getJob(id);
+ if (!executable.isReady()) {
+ nOthers++;
+ continue;
+ }
+
+ nReady++;
+ Integer priority = leftJobPriorities.get(id);
+ if (priority == null) {
+ priority = executable.getDefaultPriority();
+ }
+ jobPriorityQueue.add(new Pair<>(executable, priority));
+ }
+
+ while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()) {
+ addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
+ }
+
+ fetchFailed = false;
+ logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
+ + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " //
+ + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers
+ + " others");
+ } catch (Throwable th) {
+ fetchFailed = true; // this could happen when resource store is unavailable
+ logger.warn("Priority Job Fetcher caught a exception " + th);
+ }
+ }
+}
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index d7201f2..7c66f2c 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -129,7 +129,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
AbstractExecutable job = execMgr.getJob(jobId);
ExecutableState status = job.getStatus();
if (status == ExecutableState.RUNNING) {
- scheduler.fetchFailed = true;
+ scheduler.getFetcherRunner().setFetchFailed(true);
break;
}
Thread.sleep(1000);