You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/13 01:25:33 UTC

[kylin] branch master updated: KYLIN-3406 Fix for two inconsistent ExecuteManger

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new fab6955  KYLIN-3406 Fix for two inconsistent ExecuteManger
fab6955 is described below

commit fab6955b9d31837d8bdb723c043ecd5c30eb2625
Author: hit-lacus <hi...@126.com>
AuthorDate: Wed Dec 12 21:47:37 2018 +0800

    KYLIN-3406 Fix for two inconsistent ExecuteManger
---
 .../kylin/job/impl/threadpool/DefaultFetcherRunner.java    | 14 ++++++--------
 .../apache/kylin/job/impl/threadpool/DefaultScheduler.java |  4 ++--
 .../kylin/job/impl/threadpool/DistributedScheduler.java    |  4 ++--
 .../apache/kylin/job/impl/threadpool/FetcherRunner.java    |  9 +++++----
 .../kylin/job/impl/threadpool/PriorityFetcherRunner.java   | 14 ++++++--------
 5 files changed, 21 insertions(+), 24 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index 877c0d0..21cd8e9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -24,7 +24,6 @@ import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.slf4j.Logger;
@@ -34,9 +33,8 @@ public class DefaultFetcherRunner extends FetcherRunner {
 
     private static final Logger logger = LoggerFactory.getLogger(DefaultFetcherRunner.class);
 
-    public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context,
-            ExecutableManager executableManager, JobExecutor jobExecutor) {
-        super(jobEngineConfig, context, executableManager, jobExecutor);
+    public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) {
+        super(jobEngineConfig, context, jobExecutor);
     }
 
     @Override
@@ -50,7 +48,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIdsInCache()) {
+            for (final String id : getExecutableManger().getAllJobIdsInCache()) {
                 if (isJobPoolFull()) {
                     return;
                 }
@@ -60,7 +58,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
                     continue;
                 }
 
-                final Output outputDigest = executableManager.getOutputDigest(id);
+                final Output outputDigest = getExecutableManger().getOutputDigest(id);
                 if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
                     if (outputDigest.getState() == ExecutableState.SUCCEED) {
@@ -73,7 +71,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
-                            executableManager.forceKillJob(id);
+                            getExecutableManger().forceKillJob(id);
                             nError++;
                         } else {
                             nOthers++;
@@ -82,7 +80,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
                     continue;
                 }
 
-                final AbstractExecutable executable = executableManager.getJob(id);
+                final AbstractExecutable executable = getExecutableManger().getJob(id);
                 if (!executable.isReady()) {
                     nOthers++;
                     continue;
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 8e828f1..87670e6 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -188,8 +188,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
             }
         };
         fetcher = jobEngineConfig.getJobPriorityConsidered()
-                ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
-                : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
+                ? new PriorityFetcherRunner(jobEngineConfig, context, jobExecutor)
+                : new DefaultFetcherRunner(jobEngineConfig, context, jobExecutor);
         logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher));
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
         hasStarted = true;
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 4479943..6f360d2 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -237,8 +237,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
             }
         };
         fetcher = jobEngineConfig.getJobPriorityConsidered()
-                ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
-                : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
+                ? new PriorityFetcherRunner(jobEngineConfig, context, jobExecutor)
+                : new DefaultFetcherRunner(jobEngineConfig, context, jobExecutor);
         fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
         hasStarted = true;
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
index d98ca33..9d8f20e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
@@ -35,15 +35,12 @@ public abstract class FetcherRunner implements Runnable {
 
     protected JobEngineConfig jobEngineConfig;
     protected DefaultContext context;
-    protected ExecutableManager executableManager;
     protected JobExecutor jobExecutor;
     protected volatile boolean fetchFailed = false;
 
-    public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, ExecutableManager executableManager,
-            JobExecutor jobExecutor) {
+    public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) {
         this.jobEngineConfig = jobEngineConfig;
         this.context = context;
-        this.executableManager = executableManager;
         this.jobExecutor = jobExecutor;
     }
 
@@ -74,4 +71,8 @@ public abstract class FetcherRunner implements Runnable {
     void setFetchFailed(boolean fetchFailed) {
         this.fetchFailed = fetchFailed;
     }
+
+    ExecutableManager getExecutableManger() {
+        return ExecutableManager.getInstance(jobEngineConfig.getConfig());
+    }
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index 1d13afd..0792ed0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -27,7 +27,6 @@ import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.slf4j.Logger;
@@ -47,9 +46,8 @@ public class PriorityFetcherRunner extends FetcherRunner {
                 }
             });
 
-    public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context,
-            ExecutableManager executableManager, JobExecutor jobExecutor) {
-        super(jobEngineConfig, context, executableManager, jobExecutor);
+    public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) {
+        super(jobEngineConfig, context, jobExecutor);
     }
 
     @Override
@@ -86,14 +84,14 @@ public class PriorityFetcherRunner extends FetcherRunner {
             }
 
             int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
-            for (final String id : executableManager.getAllJobIdsInCache()) {
+            for (final String id : getExecutableManger().getAllJobIdsInCache()) {
                 if (runningJobs.containsKey(id)) {
                     // logger.debug("Job id:" + id + " is already running");
                     nRunning++;
                     continue;
                 }
 
-                final Output outputDigest = executableManager.getOutputDigest(id);
+                final Output outputDigest = getExecutableManger().getOutputDigest(id);
                 if ((outputDigest.getState() != ExecutableState.READY)) {
                     // logger.debug("Job id:" + id + " not runnable");
                     if (outputDigest.getState() == ExecutableState.SUCCEED) {
@@ -106,7 +104,7 @@ public class PriorityFetcherRunner extends FetcherRunner {
                         nStopped++;
                     } else {
                         if (fetchFailed) {
-                            executableManager.forceKillJob(id);
+                            getExecutableManger().forceKillJob(id);
                             nError++;
                         } else {
                             nOthers++;
@@ -115,7 +113,7 @@ public class PriorityFetcherRunner extends FetcherRunner {
                     continue;
                 }
 
-                AbstractExecutable executable = executableManager.getJob(id);
+                AbstractExecutable executable = getExecutableManger().getJob(id);
                 if (!executable.isReady()) {
                     nOthers++;
                     continue;