You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/01/28 11:57:50 UTC

[2/6] kylin git commit: KYLIN-2909 use EmailTemplateFactory in CubingJob (Thanks Yanghong Zhong and Julian Pan)

KYLIN-2909 use EmailTemplateFactory in CubingJob (Thanks Yanghong Zhong and Julian Pan)

Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c96e1b19
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c96e1b19
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c96e1b19

Branch: refs/heads/master
Commit: c96e1b1947ca8764066ff4390ea1f9e4aa0f6c40
Parents: 7dc1875
Author: Zhong <nj...@apache.org>
Authored: Sat Sep 30 14:26:24 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Sun Jan 28 15:57:30 2018 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../kylin/common/util/EmailTemplateFactory.java |  11 +-
 .../main/resources/templates/JOB_DISCARD.ftl    |   2 +-
 .../main/resources/templates/JOB_SUCCEED.ftl    |   2 +-
 .../common/util/EmailTemplateFactoryTest.java   |  32 +++++
 .../kylin/job/constant/ExecutableConstants.java |   1 -
 .../kylin/job/execution/AbstractExecutable.java |  92 ++++++++++---
 .../kylin/job/util/ExecutableStateUtil.java     |  38 ++++++
 .../org/apache/kylin/engine/mr/CubingJob.java   | 134 ++++++++-----------
 9 files changed, 202 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6d1b7f9..7b48935 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -661,6 +661,10 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.job.tracking-url-pattern", "");
     }
 
+    public int getJobMetadataPersistRetry() {
+        return Integer.parseInt(this.getOptional("kylin.job.metadata-persist-retry", "5"));
+    }
+
     // ============================================================================
     // SOURCE.HIVE
     // ============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/core-common/src/main/java/org/apache/kylin/common/util/EmailTemplateFactory.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/EmailTemplateFactory.java b/core-common/src/main/java/org/apache/kylin/common/util/EmailTemplateFactory.java
index fcf554d..2acea5d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/EmailTemplateFactory.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/EmailTemplateFactory.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+
 import freemarker.template.Configuration;
 import freemarker.template.Template;
 
@@ -50,14 +52,7 @@ public class EmailTemplateFactory {
     }
 
     public static String getEmailTitle(String... titleParts) {
-        StringBuilder sb = new StringBuilder();
-        for (String part : titleParts) {
-            if (sb.length() > 0) {
-                sb.append("-");
-            }
-            sb.append("[" + part + "]");
-        }
-        return sb.toString();
+        return "[" + Joiner.on("]-[").join(titleParts).toString() + "]";
     }
 
     private static EmailTemplateFactory instance = new EmailTemplateFactory();

http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/core-common/src/main/resources/templates/JOB_DISCARD.ftl
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/templates/JOB_DISCARD.ftl b/core-common/src/main/resources/templates/JOB_DISCARD.ftl
index fbef3f7..b00e9aa 100644
--- a/core-common/src/main/resources/templates/JOB_DISCARD.ftl
+++ b/core-common/src/main/resources/templates/JOB_DISCARD.ftl
@@ -33,7 +33,7 @@
 <span style="
     line-height: 1.1;font-size: 18px;">
     <p style="text-align:left;">Dear Kylin user,</p>
-    <p>It's a pity that the job is discarded.Thank you for using Kylin.</p>
+    <p>It's a pity that the job is discarded. Thank you for using Kylin.</p>
 </span>
     <hr style="margin-top: 20px;
     margin-bottom: 20px;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/core-common/src/main/resources/templates/JOB_SUCCEED.ftl
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/templates/JOB_SUCCEED.ftl b/core-common/src/main/resources/templates/JOB_SUCCEED.ftl
index 4b443d5..f1fb45c 100644
--- a/core-common/src/main/resources/templates/JOB_SUCCEED.ftl
+++ b/core-common/src/main/resources/templates/JOB_SUCCEED.ftl
@@ -32,7 +32,7 @@
 <div style="margin-left:5%;margin-right:5%;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
 <span style="line-height: 1.1;font-size: 18px;">
     <p style="text-align:left;">Dear Kylin user,</p>
-    <p>Congratulations! Please feel free to query based on kylin cube.Thank you for using Kylin.</p>
+    <p>Congratulations! Please feel free to query based on kylin cube. Thank you for using Kylin.</p>
 </span>
     <hr style="margin-top: 20px;
             margin-bottom: 20px;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/core-common/src/test/java/org/apache/kylin/common/util/EmailTemplateFactoryTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/EmailTemplateFactoryTest.java b/core-common/src/test/java/org/apache/kylin/common/util/EmailTemplateFactoryTest.java
new file mode 100644
index 0000000..6acbd75
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/EmailTemplateFactoryTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class EmailTemplateFactoryTest {
+
+    @Test
+    public void testGetEmailTitle() {
+        String[] titleParts = new String[] { "JOB", "SUCCEED" };
+        Assert.assertEquals("[" + titleParts[0] + "]-[" + titleParts[1] + "]",
+                EmailTemplateFactory.getEmailTitle(titleParts));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index cf61722..b9a3651 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -62,5 +62,4 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_GARBAGE_COLLECTION_HBASE = "Garbage Collection on HBase";
     public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
     public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Flat Hive Table";
-    public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Env: ${env_name}</li>" + "<li>Project: ${project_name}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 6a0db97..f4015bf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -24,12 +24,16 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.regex.Matcher;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.EmailTemplateEnum;
+import org.apache.kylin.common.util.EmailTemplateFactory;
 import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.impl.threadpool.DefaultContext;
@@ -64,16 +68,16 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     public AbstractExecutable() {
         setId(UUID.randomUUID().toString());
     }
-    
+
     protected void initConfig(KylinConfig config) {
         Preconditions.checkState(this.config == null || this.config == config);
         this.config = config;
     }
-    
+
     protected KylinConfig getConfig() {
         return config;
     }
-    
+
     protected ExecutableManager getManager() {
         return ExecutableManager.getInstance(config);
     }
@@ -84,6 +88,36 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
     }
 
+    private void onExecuteFinishedWithRetry(ExecuteResult result, ExecutableContext executableContext)
+            throws ExecuteException {
+        Throwable exception;
+        int nRetry = 0;
+        do {
+            nRetry++;
+            exception = null;
+            try {
+                onExecuteFinished(result, executableContext);
+            } catch (Exception e) {
+                logger.error(nRetry + "th retries for onExecuteFinished fails due to {}", e);
+                if (isMetaDataPersistException(e)) {
+                    exception = e;
+                    try {
+                        Thread.sleep(1000L * (long) Math.pow(4, nRetry));
+                    } catch (InterruptedException exp) {
+                        throw new RuntimeException(exp);
+                    }
+                } else {
+                    throw e;
+                }
+            }
+        } while (exception != null && nRetry <= executableContext.getConfig().getJobMetadataPersistRetry());
+
+        if (exception != null) {
+            handleMetadataPersistException(executableContext, exception);
+            throw new ExecuteException(exception);
+        }
+    }
+
     protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
         setEndTime(System.currentTimeMillis());
         if (!isDiscarded() && !isRunnable()) {
@@ -134,28 +168,42 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
                 retry++;
             } while (needRetry(result, exception));
 
-            //check exception in result to avoid retry on ChainedExecutable(only need retry on subtask actually)
-            if (exception != null || result.getThrowable() != null) {
+            if (exception != null) {
                 onExecuteError(exception, executableContext);
                 throw new ExecuteException(exception);
             }
 
-            onExecuteFinished(result, executableContext);
+            onExecuteFinishedWithRetry(result, executableContext);
+        } catch (ExecuteException e) {
+            throw e;
         } catch (Exception e) {
-            if (isMetaDataPersistException(e)) {
-                handleMetaDataPersistException(e);
-            }
-            if (e instanceof ExecuteException) {
-                throw e;
-            } else {
-                throw new ExecuteException(e);
-            }
+            throw new ExecuteException(e);
         }
         return result;
     }
 
-    protected void handleMetaDataPersistException(Exception e) {
-        // do nothing.
+    protected void handleMetadataPersistException(ExecutableContext context, Throwable exception) {
+        final String[] adminDls = context.getConfig().getAdminDls();
+        if (adminDls == null || adminDls.length < 1) {
+            logger.warn("no need to send email, user list is empty");
+            return;
+        }
+        List<String> users = Lists.newArrayList(adminDls);
+
+        Map<String, Object> dataMap = Maps.newHashMap();
+        dataMap.put("job_name", getName());
+        dataMap.put("env_name", context.getConfig().getDeployEnv());
+        dataMap.put("submitter", StringUtil.noBlank(getSubmitter(), "missing submitter"));
+        dataMap.put("job_engine", EmailTemplateFactory.getLocalHostName());
+        dataMap.put("error_log",
+                Matcher.quoteReplacement(StringUtil.noBlank(exception.getMessage(), "no error message")));
+
+        String content = EmailTemplateFactory.getInstance().buildEmailContent(EmailTemplateEnum.METADATA_PERSIST_FAIL,
+                dataMap);
+        String title = EmailTemplateFactory.getEmailTitle("METADATA PERSIST", "FAIL",
+                context.getConfig().getDeployEnv());
+
+        new MailService(context.getConfig()).sendMail(users, title, content);
     }
 
     private boolean isMetaDataPersistException(Exception e) {
@@ -299,7 +347,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         logger.info("job name:" + getName());
         logger.info("submitter:" + getSubmitter());
         logger.info("notify list:" + users);
-        new MailService(kylinConfig).sendMail(users, email.getLeft(), email.getRight());
+        new MailService(kylinConfig).sendMail(users, email.getFirst(), email.getSecond());
     }
 
     protected void sendMail(Pair<String, String> email) {
@@ -335,7 +383,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     public static long getEndTime(Output output) {
         return getExtraInfoAsLong(output, END_TIME, 0L);
     }
-    
+
     public static long getInterruptTime(Output output) {
         return getExtraInfoAsLong(output, INTERRUPT_TIME, 0L);
     }
@@ -437,8 +485,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         if (retryableEx == null || retryableEx.length == 0) {
             return true;
         }
-        if ((result != null && isRetryableExecutionResult(result))
-                || e != null && isRetrableException(e)) {
+        if ((result != null && isRetryableExecutionResult(result)) || e != null && isRetrableException(e)) {
             return true;
         }
         return false;
@@ -446,6 +493,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     @Override
     public String toString() {
-        return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString();
+        return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus())
+                .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/core-job/src/main/java/org/apache/kylin/job/util/ExecutableStateUtil.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/util/ExecutableStateUtil.java b/core-job/src/main/java/org/apache/kylin/job/util/ExecutableStateUtil.java
new file mode 100644
index 0000000..66f806c
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/util/ExecutableStateUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.util;
+
+import org.apache.kylin.common.util.EmailTemplateEnum;
+import org.apache.kylin.job.execution.ExecutableState;
+
+public class ExecutableStateUtil {
+
+    public static EmailTemplateEnum getEmailTemplateEnum(ExecutableState state) {
+        switch (state) {
+        case ERROR:
+            return EmailTemplateEnum.JOB_ERROR;
+        case DISCARDED:
+            return EmailTemplateEnum.JOB_DISCARD;
+        case SUCCEED:
+            return EmailTemplateEnum.JOB_SUCCEED;
+        default:
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c96e1b19/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 6f26c35..548dcb9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -18,20 +18,20 @@
 
 package org.apache.kylin.engine.mr;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.TimeZone;
 import java.util.regex.Matcher;
 
-import org.apache.commons.lang3.tuple.Pair;
+
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.EmailTemplateEnum;
+import org.apache.kylin.common.util.EmailTemplateFactory;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -47,12 +47,15 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.metrics.JobMetricsFacade;
+import org.apache.kylin.job.util.ExecutableStateUtil;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 
 /**
  */
@@ -195,52 +198,61 @@ public class CubingJob extends DefaultChainedExecutable {
         CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig())
                 .getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final Output output = getManager().getOutput(getId());
-        String logMsg;
         state = output.getState();
         if (state != ExecutableState.ERROR
                 && !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString())) {
             logger.info("state:" + state + " no need to notify users");
             return null;
         }
-        switch (state) {
-        case ERROR:
-            logMsg = output.getVerboseMsg();
-            break;
-        case DISCARDED:
-            logMsg = "job has been discarded";
-            break;
-        case SUCCEED:
-            logMsg = "job has succeeded";
-            break;
-        default:
+
+        EmailTemplateEnum templateEnum = ExecutableStateUtil.getEmailTemplateEnum(state);
+        if (templateEnum == null) {
+            logger.info("Cannot find email template for job state: " + state);
             return null;
         }
-        String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
-        content = content.replaceAll("\\$\\{job_name\\}", getName());
-        content = content.replaceAll("\\$\\{result\\}", state.toString());
-        content = content.replaceAll("\\$\\{env_name\\}", getDeployEnvName());
-        content = content.replaceAll("\\$\\{project_name\\}", getProjectName());
-        content = content.replaceAll("\\$\\{cube_name\\}", CubingExecutableUtil.getCubeName(this.getParams()));
-        content = content.replaceAll("\\$\\{source_records_count\\}", String.valueOf(findSourceRecordCount()));
-        content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
-        content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
-        content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins");
-        content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString());
-        content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter"));
-        content = content.replaceAll("\\$\\{error_log\\}",
-                Matcher.quoteReplacement(StringUtil.noBlank(logMsg, "no error message")));
-
-        try {
-            InetAddress inetAddress = InetAddress.getLocalHost();
-            content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
-        } catch (UnknownHostException e) {
-            logger.warn(e.getLocalizedMessage(), e);
-        }
 
-        String title = "[" + state.toString() + "] - [" + getDeployEnvName() + "] - [" + getProjectName() + "] - "
-                + CubingExecutableUtil.getCubeName(this.getParams());
+        Map<String, Object> dataMap = Maps.newHashMap();
+        dataMap.put("job_name", getName());
+        dataMap.put("env_name", getDeployEnvName());
+        dataMap.put("submitter", StringUtil.noBlank(getSubmitter(), "missing submitter"));
+        dataMap.put("job_engine", EmailTemplateFactory.getLocalHostName());
+        dataMap.put("project_name", getProjectName());
+        dataMap.put("cube_name", cubeInstance.getName());
+        dataMap.put("source_records_count", String.valueOf(findSourceRecordCount()));
+        dataMap.put("start_time", new Date(getStartTime()).toString());
+        dataMap.put("duration", getDuration() / 60000 + "mins");
+        dataMap.put("mr_waiting", getMapReduceWaitTime() / 60000 + "mins");
+        dataMap.put("last_update_time", new Date(getLastModified()).toString());
+
+        if (state == ExecutableState.ERROR) {
+            AbstractExecutable errorTask = null;
+            Output errorOutput = null;
+            for (AbstractExecutable task : getTasks()) {
+                errorOutput = getManager().getOutput(task.getId());
+                if (errorOutput.getState() == ExecutableState.ERROR) {
+                    errorTask = task;
+                    break;
+                }
+            }
+            Preconditions.checkNotNull(errorTask,
+                    "None of the sub tasks of cubing job " + getId() + " is error and this job should become success.");
 
-        return Pair.of(title, content);
+            dataMap.put("error_step", errorTask.getName());
+            if (errorTask instanceof MapReduceExecutable) {
+                final String mrJobId = errorOutput.getExtra().get(ExecutableConstants.MR_JOB_ID);
+                dataMap.put("mr_job_id", StringUtil.noBlank(mrJobId, "Not initialized"));
+            } else {
+                dataMap.put("mr_job_id", EmailTemplateFactory.NA);
+            }
+            dataMap.put("error_log",
+                    Matcher.quoteReplacement(StringUtil.noBlank(output.getVerboseMsg(), "no error message")));
+        }
+
+        String content = EmailTemplateFactory.getInstance()
+                .buildEmailContent(ExecutableStateUtil.getEmailTemplateEnum(state), dataMap);
+        String title = EmailTemplateFactory.getEmailTitle("JOB", state.toString(), getDeployEnvName(), getProjectName(),
+                cubeInstance.getName());
+        return Pair.newPair(title, content);
     }
 
     @Override
@@ -297,46 +309,6 @@ public class CubingJob extends DefaultChainedExecutable {
         return timeCost * 1.0 / size;
     }
 
-    /**
-     * build fail because the metadata store has problem.
-     * @param exception
-     */
-    @Override
-    protected void handleMetaDataPersistException(Exception exception) {
-        String title = "[ERROR] - [" + getDeployEnvName() + "] - [" + getProjectName() + "] - "
-                + CubingExecutableUtil.getCubeName(this.getParams());
-        String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
-        final String UNKNOWN = "UNKNOWN";
-        String errMsg = null;
-        if (exception != null) {
-            final StringWriter out = new StringWriter();
-            exception.printStackTrace(new PrintWriter(out));
-            errMsg = out.toString();
-        }
-
-        content = content.replaceAll("\\$\\{job_name\\}", getName());
-        content = content.replaceAll("\\$\\{result\\}", ExecutableState.ERROR.toString());
-        content = content.replaceAll("\\$\\{env_name\\}", getDeployEnvName());
-        content = content.replaceAll("\\$\\{project_name\\}", getProjectName());
-        content = content.replaceAll("\\$\\{cube_name\\}", CubingExecutableUtil.getCubeName(this.getParams()));
-        content = content.replaceAll("\\$\\{source_records_count\\}", UNKNOWN);
-        content = content.replaceAll("\\$\\{start_time\\}", UNKNOWN);
-        content = content.replaceAll("\\$\\{duration\\}", UNKNOWN);
-        content = content.replaceAll("\\$\\{mr_waiting\\}", UNKNOWN);
-        content = content.replaceAll("\\$\\{last_update_time\\}", UNKNOWN);
-        content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter"));
-        content = content.replaceAll("\\$\\{error_log\\}",
-                Matcher.quoteReplacement(StringUtil.noBlank(errMsg, "no error message")));
-
-        try {
-            InetAddress inetAddress = InetAddress.getLocalHost();
-            content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
-        } catch (UnknownHostException e) {
-            logger.warn(e.getLocalizedMessage(), e);
-        }
-        sendMail(Pair.of(title, content));
-    }
-
     public long getMapReduceWaitTime() {
         return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
     }