You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/13 01:25:33 UTC
[kylin] branch master updated: KYLIN-3406 Fix for two inconsistent
ExecuteManger
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new fab6955 KYLIN-3406 Fix for two inconsistent ExecuteManger
fab6955 is described below
commit fab6955b9d31837d8bdb723c043ecd5c30eb2625
Author: hit-lacus <hi...@126.com>
AuthorDate: Wed Dec 12 21:47:37 2018 +0800
KYLIN-3406 Fix for two inconsistent ExecuteManger
---
.../kylin/job/impl/threadpool/DefaultFetcherRunner.java | 14 ++++++--------
.../apache/kylin/job/impl/threadpool/DefaultScheduler.java | 4 ++--
.../kylin/job/impl/threadpool/DistributedScheduler.java | 4 ++--
.../apache/kylin/job/impl/threadpool/FetcherRunner.java | 9 +++++----
.../kylin/job/impl/threadpool/PriorityFetcherRunner.java | 14 ++++++--------
5 files changed, 21 insertions(+), 24 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
index 877c0d0..21cd8e9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java
@@ -24,7 +24,6 @@ import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.slf4j.Logger;
@@ -34,9 +33,8 @@ public class DefaultFetcherRunner extends FetcherRunner {
private static final Logger logger = LoggerFactory.getLogger(DefaultFetcherRunner.class);
- public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context,
- ExecutableManager executableManager, JobExecutor jobExecutor) {
- super(jobEngineConfig, context, executableManager, jobExecutor);
+ public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) {
+ super(jobEngineConfig, context, jobExecutor);
}
@Override
@@ -50,7 +48,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
}
int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
- for (final String id : executableManager.getAllJobIdsInCache()) {
+ for (final String id : getExecutableManger().getAllJobIdsInCache()) {
if (isJobPoolFull()) {
return;
}
@@ -60,7 +58,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
continue;
}
- final Output outputDigest = executableManager.getOutputDigest(id);
+ final Output outputDigest = getExecutableManger().getOutputDigest(id);
if ((outputDigest.getState() != ExecutableState.READY)) {
// logger.debug("Job id:" + id + " not runnable");
if (outputDigest.getState() == ExecutableState.SUCCEED) {
@@ -73,7 +71,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
nStopped++;
} else {
if (fetchFailed) {
- executableManager.forceKillJob(id);
+ getExecutableManger().forceKillJob(id);
nError++;
} else {
nOthers++;
@@ -82,7 +80,7 @@ public class DefaultFetcherRunner extends FetcherRunner {
continue;
}
- final AbstractExecutable executable = executableManager.getJob(id);
+ final AbstractExecutable executable = getExecutableManger().getJob(id);
if (!executable.isReady()) {
nOthers++;
continue;
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 8e828f1..87670e6 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -188,8 +188,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
}
};
fetcher = jobEngineConfig.getJobPriorityConsidered()
- ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
- : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
+ ? new PriorityFetcherRunner(jobEngineConfig, context, jobExecutor)
+ : new DefaultFetcherRunner(jobEngineConfig, context, jobExecutor);
logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher));
fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
hasStarted = true;
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 4479943..6f360d2 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -237,8 +237,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
};
fetcher = jobEngineConfig.getJobPriorityConsidered()
- ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor)
- : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor);
+ ? new PriorityFetcherRunner(jobEngineConfig, context, jobExecutor)
+ : new DefaultFetcherRunner(jobEngineConfig, context, jobExecutor);
fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
hasStarted = true;
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
index d98ca33..9d8f20e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java
@@ -35,15 +35,12 @@ public abstract class FetcherRunner implements Runnable {
protected JobEngineConfig jobEngineConfig;
protected DefaultContext context;
- protected ExecutableManager executableManager;
protected JobExecutor jobExecutor;
protected volatile boolean fetchFailed = false;
- public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, ExecutableManager executableManager,
- JobExecutor jobExecutor) {
+ public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) {
this.jobEngineConfig = jobEngineConfig;
this.context = context;
- this.executableManager = executableManager;
this.jobExecutor = jobExecutor;
}
@@ -74,4 +71,8 @@ public abstract class FetcherRunner implements Runnable {
void setFetchFailed(boolean fetchFailed) {
this.fetchFailed = fetchFailed;
}
+
+ ExecutableManager getExecutableManger() {
+ return ExecutableManager.getInstance(jobEngineConfig.getConfig());
+ }
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
index 1d13afd..0792ed0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java
@@ -27,7 +27,6 @@ import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.slf4j.Logger;
@@ -47,9 +46,8 @@ public class PriorityFetcherRunner extends FetcherRunner {
}
});
- public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context,
- ExecutableManager executableManager, JobExecutor jobExecutor) {
- super(jobEngineConfig, context, executableManager, jobExecutor);
+ public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) {
+ super(jobEngineConfig, context, jobExecutor);
}
@Override
@@ -86,14 +84,14 @@ public class PriorityFetcherRunner extends FetcherRunner {
}
int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
- for (final String id : executableManager.getAllJobIdsInCache()) {
+ for (final String id : getExecutableManger().getAllJobIdsInCache()) {
if (runningJobs.containsKey(id)) {
// logger.debug("Job id:" + id + " is already running");
nRunning++;
continue;
}
- final Output outputDigest = executableManager.getOutputDigest(id);
+ final Output outputDigest = getExecutableManger().getOutputDigest(id);
if ((outputDigest.getState() != ExecutableState.READY)) {
// logger.debug("Job id:" + id + " not runnable");
if (outputDigest.getState() == ExecutableState.SUCCEED) {
@@ -106,7 +104,7 @@ public class PriorityFetcherRunner extends FetcherRunner {
nStopped++;
} else {
if (fetchFailed) {
- executableManager.forceKillJob(id);
+ getExecutableManger().forceKillJob(id);
nError++;
} else {
nOthers++;
@@ -115,7 +113,7 @@ public class PriorityFetcherRunner extends FetcherRunner {
continue;
}
- AbstractExecutable executable = executableManager.getJob(id);
+ AbstractExecutable executable = getExecutableManger().getJob(id);
if (!executable.isReady()) {
nOthers++;
continue;