You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/02 10:45:47 UTC

incubator-kylin git commit: KYLIN-869

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 cb46b8085 -> 6d920a9ff


KYLIN-869


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6d920a9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6d920a9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6d920a9f

Branch: refs/heads/0.8
Commit: 6d920a9ff1fec82b95013bec082754ed90416ba6
Parents: cb46b80
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jul 2 14:18:00 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Jul 2 16:45:29 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/model/CubeDesc.java | 10 ++++++++++
 .../java/org/apache/kylin/job/cube/CubingJob.java  | 17 +++++++++++++----
 .../kylin/job/execution/AbstractExecutable.java    |  6 +++---
 .../job/execution/DefaultChainedExecutable.java    |  8 ++++----
 4 files changed, 30 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d920a9f/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2f96ceb..a335ea7 100644
--- a/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -113,6 +113,8 @@ public class CubeDesc extends RootPersistentEntity {
     private String signature;
     @JsonProperty("notify_list")
     private List<String> notifyList;
+    @JsonProperty("status_need_notify")
+    private List<String> statusNeedNotify = Collections.emptyList();
 
     private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
     private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
@@ -370,6 +372,14 @@ public class CubeDesc extends RootPersistentEntity {
         this.notifyList = notifyList;
     }
 
+    public List<String> getStatusNeedNotify() {
+        return statusNeedNotify;
+    }
+
+    public void setStatusNeedNotify(List<String> statusNeedNotify) {
+        this.statusNeedNotify = statusNeedNotify;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d920a9f/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
index 4b45b94..bfb716c 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
@@ -24,6 +24,8 @@ import java.util.Date;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -63,18 +65,25 @@ public class CubingJob extends DefaultChainedExecutable {
     }
 
     @Override
-    protected Pair<String, String> formatNotifications(ExecutableState state) {
+    protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
+        CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
         final Output output = jobService.getOutput(getId());
         String logMsg;
-        switch (output.getState()) {
+        state = output.getState();
+        if (state != ExecutableState.ERROR &&
+                !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString().toLowerCase())) {
+            logger.info("state:" + state + " no need to notify users");
+            return null;
+        }
+        switch (state) {
             case ERROR:
                 logMsg = output.getVerboseMsg();
                 break;
             case DISCARDED:
-                logMsg = "";
+                logMsg = "job has been discarded";
                 break;
             case SUCCEED:
-                logMsg = "";
+                logMsg = "job has succeeded";
                 break;
             default:
                 return null;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d920a9f/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index d5b801b..be82b3a 100644
--- a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -190,11 +190,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         setNotifyList(StringUtils.join(notifications, ","));
     }
 
-    protected Pair<String, String> formatNotifications(ExecutableState state) {
+    protected Pair<String, String> formatNotifications(ExecutableContext executableContext, ExecutableState state) {
         return null;
     }
 
-    protected final void notifyUserStatusChange(ExecutableState state) {
+    protected final void notifyUserStatusChange(ExecutableContext context, ExecutableState state) {
         try {
             List<String> users = Lists.newArrayList();
             users.addAll(getNotifyList());
@@ -207,7 +207,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
             if (users.isEmpty()) {
                 return;
             }
-            final Pair<String, String> email = formatNotifications(state);
+            final Pair<String, String> email = formatNotifications(context, state);
             if (email == null) {
                 return;
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d920a9f/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 2e7bcde..19b0d74 100644
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -68,7 +68,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
     protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
         if (isDiscarded()) {
             setEndTime(System.currentTimeMillis());
-            notifyUserStatusChange(ExecutableState.DISCARDED);
+            notifyUserStatusChange(executableContext, ExecutableState.DISCARDED);
         } else if (result.succeed()) {
             List<? extends Executable> jobs = getTasks();
             boolean allSucceed = true;
@@ -85,18 +85,18 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
             if (allSucceed) {
                 setEndTime(System.currentTimeMillis());
                 jobService.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
-                notifyUserStatusChange(ExecutableState.SUCCEED);
+                notifyUserStatusChange(executableContext, ExecutableState.SUCCEED);
             } else if (hasError) {
                 setEndTime(System.currentTimeMillis());
                 jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
-                notifyUserStatusChange(ExecutableState.ERROR);
+                notifyUserStatusChange(executableContext, ExecutableState.ERROR);
             } else {
                 jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
             }
         } else {
             setEndTime(System.currentTimeMillis());
             jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
-            notifyUserStatusChange(ExecutableState.ERROR);
+            notifyUserStatusChange(executableContext, ExecutableState.ERROR);
         }
     }