You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/02 10:45:47 UTC
incubator-kylin git commit: KYLIN-869
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 cb46b8085 -> 6d920a9ff
KYLIN-869
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6d920a9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6d920a9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6d920a9f
Branch: refs/heads/0.8
Commit: 6d920a9ff1fec82b95013bec082754ed90416ba6
Parents: cb46b80
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jul 2 14:18:00 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Jul 2 16:45:29 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/model/CubeDesc.java | 10 ++++++++++
.../java/org/apache/kylin/job/cube/CubingJob.java | 17 +++++++++++++----
.../kylin/job/execution/AbstractExecutable.java | 6 +++---
.../job/execution/DefaultChainedExecutable.java | 8 ++++----
4 files changed, 30 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d920a9f/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2f96ceb..a335ea7 100644
--- a/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -113,6 +113,8 @@ public class CubeDesc extends RootPersistentEntity {
private String signature;
@JsonProperty("notify_list")
private List<String> notifyList;
+ @JsonProperty("status_need_notify")
+ private List<String> statusNeedNotify = Collections.emptyList();
private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
@@ -370,6 +372,14 @@ public class CubeDesc extends RootPersistentEntity {
this.notifyList = notifyList;
}
+ public List<String> getStatusNeedNotify() {
+ return statusNeedNotify;
+ }
+
+ public void setStatusNeedNotify(List<String> statusNeedNotify) {
+ this.statusNeedNotify = statusNeedNotify;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d920a9f/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
index 4b45b94..bfb716c 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
@@ -24,6 +24,8 @@ import java.util.Date;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -63,18 +65,25 @@ public class CubingJob extends DefaultChainedExecutable {
}
@Override
- protected Pair<String, String> formatNotifications(ExecutableState state) {
+ protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
+ CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
final Output output = jobService.getOutput(getId());
String logMsg;
- switch (output.getState()) {
+ state = output.getState();
+ if (state != ExecutableState.ERROR &&
+ !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString().toLowerCase())) {
+ logger.info("state:" + state + " no need to notify users");
+ return null;
+ }
+ switch (state) {
case ERROR:
logMsg = output.getVerboseMsg();
break;
case DISCARDED:
- logMsg = "";
+ logMsg = "job has been discarded";
break;
case SUCCEED:
- logMsg = "";
+ logMsg = "job has succeeded";
break;
default:
return null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d920a9f/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index d5b801b..be82b3a 100644
--- a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -190,11 +190,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
setNotifyList(StringUtils.join(notifications, ","));
}
- protected Pair<String, String> formatNotifications(ExecutableState state) {
+ protected Pair<String, String> formatNotifications(ExecutableContext executableContext, ExecutableState state) {
return null;
}
- protected final void notifyUserStatusChange(ExecutableState state) {
+ protected final void notifyUserStatusChange(ExecutableContext context, ExecutableState state) {
try {
List<String> users = Lists.newArrayList();
users.addAll(getNotifyList());
@@ -207,7 +207,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
if (users.isEmpty()) {
return;
}
- final Pair<String, String> email = formatNotifications(state);
+ final Pair<String, String> email = formatNotifications(context, state);
if (email == null) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d920a9f/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 2e7bcde..19b0d74 100644
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -68,7 +68,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
if (isDiscarded()) {
setEndTime(System.currentTimeMillis());
- notifyUserStatusChange(ExecutableState.DISCARDED);
+ notifyUserStatusChange(executableContext, ExecutableState.DISCARDED);
} else if (result.succeed()) {
List<? extends Executable> jobs = getTasks();
boolean allSucceed = true;
@@ -85,18 +85,18 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
if (allSucceed) {
setEndTime(System.currentTimeMillis());
jobService.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
- notifyUserStatusChange(ExecutableState.SUCCEED);
+ notifyUserStatusChange(executableContext, ExecutableState.SUCCEED);
} else if (hasError) {
setEndTime(System.currentTimeMillis());
jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
- notifyUserStatusChange(ExecutableState.ERROR);
+ notifyUserStatusChange(executableContext, ExecutableState.ERROR);
} else {
jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
}
} else {
setEndTime(System.currentTimeMillis());
jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
- notifyUserStatusChange(ExecutableState.ERROR);
+ notifyUserStatusChange(executableContext, ExecutableState.ERROR);
}
}