You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/03/11 06:28:13 UTC

[kylin] 04/04: KYLIN-3174, Default scheduler enhancement.

This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 06f497eb603134a6b0ddbfd88739ccaddab007fb
Author: Jiatao Tao <24...@qq.com>
AuthorDate: Tue Mar 6 14:44:22 2018 +0800

    KYLIN-3174, Default scheduler enhancement.
---
 .../job/impl/threadpool/DefaultScheduler.java      | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 0b01284..920601d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -125,7 +125,11 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
 
         @Override
         synchronized public void run() {
-            try {
+            try (SetThreadName ignored = new SetThreadName(//
+                    "Scheduler %s PriorityFetcherRunner %s"//
+                    , System.identityHashCode(DefaultScheduler.this)//
+                    , System.identityHashCode(this)//
+            )) {//
                 // logger.debug("Job Fetcher is running...");
                 Map<String, Executable> runningJobs = context.getRunningJobs();
 
@@ -195,12 +199,12 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
                     addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
                 }
 
-                logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
+                logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, "
                         + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " //
                         + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers
                         + " others");
-            } catch (Exception e) {
-                logger.warn("Job Fetcher caught a exception " + e);
+            } catch (Throwable th) {
+                logger.warn("Priority Job Fetcher caught a exception " + th);
             }
         }
     }
@@ -209,8 +213,11 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
 
         @Override
         synchronized public void run() {
-            try (SetThreadName ignored = new SetThreadName("Scheduler %s FetcherRunner",
-                    System.identityHashCode(DefaultScheduler.this))) {
+            try (SetThreadName ignored = new SetThreadName(//
+                    "Scheduler %s FetcherRunner %s"//
+                    , System.identityHashCode(DefaultScheduler.this)//
+                    , System.identityHashCode(this)//
+            )) {//
                 // logger.debug("Job Fetcher is running...");
                 Map<String, Executable> runningJobs = context.getRunningJobs();
                 if (isJobPoolFull()) {
@@ -268,9 +275,9 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
                 logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, "
                         + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError
                         + " error, " + nDiscarded + " discarded, " + nOthers + " others");
-            } catch (Exception e) {
+            } catch (Throwable th) {
                 fetchFailed = true; // this could happen when resource store is unavailable
-                logger.warn("Job Fetcher caught a exception ", e);
+                logger.warn("Job Fetcher caught a exception ", th);
             }
         }
     }
@@ -353,11 +360,15 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
                 new SynchronousQueue<Runnable>());
         context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
 
+        logger.info("Staring resume all running jobs.");
         executableManager.resumeAllRunningJobs();
+        logger.info("Finishing resume all running jobs.");
 
         int pollSecond = jobEngineConfig.getPollIntervalSecond();
+
         logger.info("Fetching jobs every {} seconds", pollSecond);
         fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner();
+        logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher));
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
         hasStarted = true;
     }
@@ -373,14 +384,14 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
             fetcherPool.awaitTermination(1, TimeUnit.MINUTES);
         } catch (InterruptedException e) {
             //ignore it
-            logger.warn("InterruptedException is caught!");
+            logger.warn("InterruptedException is caught when shutting down job fetcher.", e);
         }
         try {
             jobPool.shutdownNow();//interrupt
             jobPool.awaitTermination(1, TimeUnit.MINUTES);
         } catch (InterruptedException e) {
             //ignore it
-            logger.warn("InterruptedException is caught!");
+            logger.warn("InterruptedException is caught when shutting down job pool.", e);
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
liyang@apache.org.