You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 15:14:17 UTC
kylin git commit: KYLIN-2169 bug fix
Repository: kylin
Updated Branches:
refs/heads/master e7a20a063 -> 0c6aa760e
KYLIN-2169 bug fix
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0c6aa760
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0c6aa760
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0c6aa760
Branch: refs/heads/master
Commit: 0c6aa760e81a30b7df08c9c2015af9f03c253567
Parents: e7a20a0
Author: Yang Li <li...@apache.org>
Authored: Tue Nov 8 23:13:57 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Nov 8 23:13:57 2016 +0800
----------------------------------------------------------------------
.../kylin/job/execution/AbstractExecutable.java | 12 +++++-------
.../job/execution/DefaultChainedExecutable.java | 20 ++++++++++----------
2 files changed, 15 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6aa760/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index f7b8a7c..2a4b2df 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -248,14 +248,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected final void notifyUserStatusChange(ExecutableContext context, ExecutableState state) {
try {
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- List<String> users = getAllNofifyUsers(kylinConfig);
+ List<String> users = getAllNofifyUsers(config);
if (users.isEmpty()) {
logger.warn("no need to send email, user list is empty");
return;
}
final Pair<String, String> email = formatNotifications(context, state);
- doSendMail(kylinConfig, users, email);
+ doSendMail(config, users, email);
} catch (Exception e) {
logger.error("error send email", e);
}
@@ -287,13 +286,12 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected void sendMail(Pair<String, String> email) {
try {
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- List<String> users = getAllNofifyUsers(kylinConfig);
+ List<String> users = getAllNofifyUsers(config);
if (users.isEmpty()) {
logger.warn("no need to send email, user list is empty");
return;
}
- doSendMail(kylinConfig, users, email);
+ doSendMail(config, users, email);
} catch (Exception e) {
logger.error("error send email", e);
}
@@ -378,7 +376,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
}
protected boolean needRetry() {
- return this.retry <= KylinConfig.getInstanceFromEnv().getJobRetry();
+ return this.retry <= config.getJobRetry();
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6aa760/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index edc8189..621d51d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -33,8 +33,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
private final List<AbstractExecutable> subTasks = Lists.newArrayList();
- protected final ExecutableManager jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
public DefaultChainedExecutable() {
super();
}
@@ -65,9 +63,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
info.put(START_TIME, Long.toString(System.currentTimeMillis()));
final long startTime = getStartTime();
if (startTime > 0) {
- jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
} else {
- jobService.updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
}
}
@@ -79,6 +77,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
@Override
protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
+ ExecutableManager mgr = getManager();
+
if (isDiscarded()) {
setEndTime(System.currentTimeMillis());
notifyUserStatusChange(executableContext, ExecutableState.DISCARDED);
@@ -105,22 +105,22 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
}
if (allSucceed) {
setEndTime(System.currentTimeMillis());
- jobService.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
+ mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
notifyUserStatusChange(executableContext, ExecutableState.SUCCEED);
} else if (hasError) {
setEndTime(System.currentTimeMillis());
- jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
+ mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
notifyUserStatusChange(executableContext, ExecutableState.ERROR);
} else if (hasRunning) {
- jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ mgr.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
} else if (hasDiscarded) {
- jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null);
+ mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null);
} else {
- jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
+ mgr.updateJobOutput(getId(), ExecutableState.READY, null, null);
}
} else {
setEndTime(System.currentTimeMillis());
- jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+ mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
notifyUserStatusChange(executableContext, ExecutableState.ERROR);
}
}