You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/03/11 06:28:13 UTC
[kylin] 04/04: KYLIN-3174, Default scheduler enhancement.
This is an automated email from the ASF dual-hosted git repository.
liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 06f497eb603134a6b0ddbfd88739ccaddab007fb
Author: Jiatao Tao <24...@qq.com>
AuthorDate: Tue Mar 6 14:44:22 2018 +0800
KYLIN-3174, Default scheduler enhancement.
---
.../job/impl/threadpool/DefaultScheduler.java | 31 +++++++++++++++-------
1 file changed, 21 insertions(+), 10 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 0b01284..920601d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -125,7 +125,11 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
@Override
synchronized public void run() {
- try {
+ try (SetThreadName ignored = new SetThreadName(//
+ "Scheduler %s PriorityFetcherRunner %s"//
+ , System.identityHashCode(DefaultScheduler.this)//
+ , System.identityHashCode(this)//
+ )) {//
// logger.debug("Job Fetcher is running...");
Map<String, Executable> runningJobs = context.getRunningJobs();
@@ -195,12 +199,12 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
}
- logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
+ logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
+ nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " //
+ nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers
+ " others");
- } catch (Exception e) {
- logger.warn("Job Fetcher caught a exception " + e);
+ } catch (Throwable th) {
+ logger.warn("Priority Job Fetcher caught a exception " + th);
}
}
}
@@ -209,8 +213,11 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
@Override
synchronized public void run() {
- try (SetThreadName ignored = new SetThreadName("Scheduler %s FetcherRunner",
- System.identityHashCode(DefaultScheduler.this))) {
+ try (SetThreadName ignored = new SetThreadName(//
+ "Scheduler %s FetcherRunner %s"//
+ , System.identityHashCode(DefaultScheduler.this)//
+ , System.identityHashCode(this)//
+ )) {//
// logger.debug("Job Fetcher is running...");
Map<String, Executable> runningJobs = context.getRunningJobs();
if (isJobPoolFull()) {
@@ -268,9 +275,9 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, "
+ nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError
+ " error, " + nDiscarded + " discarded, " + nOthers + " others");
- } catch (Exception e) {
+ } catch (Throwable th) {
fetchFailed = true; // this could happen when resource store is unavailable
- logger.warn("Job Fetcher caught a exception ", e);
+ logger.warn("Job Fetcher caught a exception ", th);
}
}
}
@@ -353,11 +360,15 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
new SynchronousQueue<Runnable>());
context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
+ logger.info("Staring resume all running jobs.");
executableManager.resumeAllRunningJobs();
+ logger.info("Finishing resume all running jobs.");
int pollSecond = jobEngineConfig.getPollIntervalSecond();
+
logger.info("Fetching jobs every {} seconds", pollSecond);
fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner();
+ logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher));
fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
hasStarted = true;
}
@@ -373,14 +384,14 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
fetcherPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
//ignore it
- logger.warn("InterruptedException is caught!");
+ logger.warn("InterruptedException is caught when shutting down job fetcher.", e);
}
try {
jobPool.shutdownNow();//interrupt
jobPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
//ignore it
- logger.warn("InterruptedException is caught!");
+ logger.warn("InterruptedException is caught when shutting down job pool.", e);
}
}
--
To stop receiving notification emails like this one, please contact
liyang@apache.org.