You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/13 06:02:39 UTC

[kylin] branch kylin5 updated (92af96e52a -> 80ff228502)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


    from 92af96e52a [DIRTY] upgrade spark version
     new 1270d6f15e KYLIN-5361 modify email notification function and adjust email hard code to config file
     new 80ff228502 KYLIN-5391 Kylin metadata tool for read specific file

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build/bin/metastore.sh                             |  43 ++-
 .../setting/SettingAdvanced/SettingAdvanced.vue    |  40 ++-
 .../components/setting/SettingAdvanced/handler.js  |  11 +-
 .../components/setting/SettingAdvanced/locales.js  |   6 +-
 pom.xml                                            |   6 +
 .../rest/request/JobNotificationConfigRequest.java |   6 +-
 .../kylin/rest/response/ProjectConfigResponse.java |   8 +-
 .../apache/kylin/rest/service/ProjectService.java  |  18 +-
 src/core-common/pom.xml                            |   4 +
 .../org/apache/kylin/common/KylinConfigBase.java   |  15 +-
 .../constant/NonCustomProjectLevelConfig.java      |   7 +-
 .../common/util/BasicEmailNotificationContent.java |  62 ----
 .../org/apache/kylin/common/util/MailHelper.java   |  65 ++--
 .../kylin/common/util/MailTemplateProvider.java    |  87 +++++
 .../org/apache/kylin/common/util/StringUtil.java   |   4 +
 .../apache/kylin/common/KylinConfigBaseTest.java   |  12 +-
 .../apache/kylin/common/util/MailServiceTest.java  |   2 +-
 .../apache/kylin/job/constant/JobIssueEnum.java    |   4 +-
 .../kylin/job/execution/AbstractExecutable.java    | 192 +++++++---
 .../kylin/job/execution/DefaultExecutable.java     | 120 ++++---
 .../job/execution/EmailNotificationContent.java    | 211 ++++++++---
 .../kylin/job/execution/ExecutableState.java       |  13 +
 .../kylin/job/execution/NExecutableManager.java    |   5 +-
 .../kylin/job/util/MailNotificationUtil.java       | 104 ++++++
 .../main/resources/mail_templates/JOB_DISCARD.ftl  | 274 +++++++++++++++
 .../main/resources/mail_templates/JOB_ERROR.ftl    | 390 +++++++++++++++++++++
 .../main/resources/mail_templates/JOB_SUCCEED.ftl  | 273 +++++++++++++++
 .../resources/mail_templates/LOAD_EMPTY_DATA.ftl   | 200 +++++++++++
 .../mail_templates/METADATA_PERSIST_FAIL.ftl       | 232 ++++++++++++
 .../mail_templates/OVER_CAPACITY_THRESHOLD.ftl     | 200 +++++++++++
 .../mail_templates/SOURCE_RECORDS_CHANGE.ftl       | 205 +++++++++++
 .../kylin/job/execution/ErrorTestExecutable.java   |   4 +-
 .../job/execution/NExecutableManagerTest.java      | 101 +++++-
 .../kylin/job/execution/SucceedTestExecutable.java |   5 +-
 .../job/impl/threadpool/NDefaultSchedulerTest.java |   3 +-
 .../kylin/metadata/project/ProjectInstance.java    |  10 +
 .../metadata/sourceusage/SourceUsageManager.java   |   6 +-
 .../test_case_data/localmeta/kylin.properties      |  23 +-
 .../apache/kylin/rest/service/JobServiceTest.java  |   7 +-
 .../kylin/rest/controller/NProjectController.java  |   9 +-
 .../rest/controller/NProjectControllerTest.java    |   5 +-
 .../kylin/rest/service/ProjectServiceTest.java     |  14 +-
 .../streaming/jobs/StreamingDFMergeJobTest.java    |   2 +-
 .../java/org/apache/kylin/tool/MetadataTool.java   |  84 ++++-
 .../org/apache/kylin/tool/MetadataToolTest.java    |  32 ++
 45 files changed, 2811 insertions(+), 313 deletions(-)
 delete mode 100644 src/core-common/src/main/java/org/apache/kylin/common/util/BasicEmailNotificationContent.java
 create mode 100644 src/core-common/src/main/java/org/apache/kylin/common/util/MailTemplateProvider.java
 create mode 100644 src/core-job/src/main/java/org/apache/kylin/job/util/MailNotificationUtil.java
 create mode 100644 src/core-job/src/main/resources/mail_templates/JOB_DISCARD.ftl
 create mode 100644 src/core-job/src/main/resources/mail_templates/JOB_ERROR.ftl
 create mode 100644 src/core-job/src/main/resources/mail_templates/JOB_SUCCEED.ftl
 create mode 100644 src/core-job/src/main/resources/mail_templates/LOAD_EMPTY_DATA.ftl
 create mode 100644 src/core-job/src/main/resources/mail_templates/METADATA_PERSIST_FAIL.ftl
 create mode 100644 src/core-job/src/main/resources/mail_templates/OVER_CAPACITY_THRESHOLD.ftl
 create mode 100644 src/core-job/src/main/resources/mail_templates/SOURCE_RECORDS_CHANGE.ftl


[kylin] 02/02: KYLIN-5391 Kylin metadata tool for read specific file

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 80ff2285027d1c55aa07bb07c0afd64c823a71ea
Author: zhennzhang <zh...@ebay.com>
AuthorDate: Fri Jan 13 11:50:43 2023 +0800

    KYLIN-5391 Kylin metadata tool for read specific file
---
 build/bin/metastore.sh                             | 43 ++++++++++-
 .../java/org/apache/kylin/tool/MetadataTool.java   | 84 +++++++++++++++++++++-
 .../org/apache/kylin/tool/MetadataToolTest.java    | 32 +++++++++
 3 files changed, 157 insertions(+), 2 deletions(-)

diff --git a/build/bin/metastore.sh b/build/bin/metastore.sh
index 05c998eae1..9e791eea71 100755
--- a/build/bin/metastore.sh
+++ b/build/bin/metastore.sh
@@ -26,6 +26,8 @@ fi
 
 function help {
     echo "usage: metastore.sh backup METADATA_BACKUP_PATH(the default path is KYLIN_HOME/meta_backups/)"
+    echo "       metastore.sh fetch TARGET_FILE_PATH METADATA_FETCH_PATH(the default path is KYLIN_HOME/meta_fetch/)"
+    echo "       metastore.sh list TARGET_FOLDER_PATH"
     echo "       metastore.sh restore METADATA_RESTORE_PATH [--after-truncate]"
     echo "       metastore.sh backup-project PROJECT_NAME METADATA_BACKUP_PATH(the default path is KYLIN_HOME/meta_backups/)"
     echo "       metastore.sh restore-project PROJECT_NAME METADATA_RESTORE_PATH [--after-truncate]"
@@ -44,6 +46,18 @@ function printBackupResult() {
     fi
 }
 
+function printFetchResult() {
+  error=$1
+  if [[ $error == 0 ]]; then
+      if [[ -z "$path" ]]; then
+          path="\${KYLIN_HOME}/meta_fetch"
+      fi
+      echo -e "${YELLOW} Fetch at local dist succeed.${RESTORE}"
+  else
+      echo -e "${YELLOW} Fetch failed.${RESTORE}"
+  fi
+}
+
 function printRestoreResult() {
     error=$1
 
@@ -134,6 +148,34 @@ then
     ${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.MetadataTool ${BACKUP_OPTS}
     printBackupResult $?
 
+elif [ "$1" == "fetch" ]
+then
+    FETCH_OPTS="-fetch"
+    if [ $# -eq 2 ]; then
+        _file=$2
+        FETCH_OPTS="${FETCH_OPTS} -target ${_file}"
+    elif [ $# -eq 3 ]; then
+        _file=$2
+        path=`cd $3 && pwd -P`
+        check_path_empty ${path}
+        FETCH_OPTS="${FETCH_OPTS} -target ${_file} -dir ${path}"
+    else
+        help
+    fi
+
+    ${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.MetadataTool ${FETCH_OPTS}
+    printFetchResult $?
+
+elif [ "$1" == "list" ]
+then
+    if [ $# -eq 2 ]; then
+        _folder=$2
+        LIST_OPTS="-list -target ${_folder}"
+    else
+        help
+    fi
+    ${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.MetadataTool ${LIST_OPTS}
+
 elif [ "$1" == "restore" ]
 then
     if [ $# -eq 2 ]; then
@@ -171,4 +213,3 @@ else
     help
 fi
 
-
diff --git a/src/tool/src/main/java/org/apache/kylin/tool/MetadataTool.java b/src/tool/src/main/java/org/apache/kylin/tool/MetadataTool.java
index 66aa1d8b3d..daa8aed307 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/MetadataTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/MetadataTool.java
@@ -87,6 +87,12 @@ public class MetadataTool extends ExecutableApplication {
     private static final Option OPERATE_COMPRESS = OptionBuilder.getInstance()
             .withDescription("Backup compressed metadata to HDFS path").isRequired(false).create("compress");
 
+    private static final Option OPERATE_FETCH = OptionBuilder.getInstance()
+            .withDescription("Fetch part of metadata to local path").isRequired(false).create("fetch");
+
+    private static final Option OPERATE_LIST = OptionBuilder.getInstance()
+            .withDescription("List children of target folder").isRequired(false).create("list");
+
     private static final Option OPERATE_RESTORE = OptionBuilder.getInstance()
             .withDescription("Restore metadata from local path or HDFS path").isRequired(false).create("restore");
 
@@ -100,6 +106,9 @@ public class MetadataTool extends ExecutableApplication {
     private static final Option OPTION_PROJECT = OptionBuilder.getInstance().hasArg().withArgName("PROJECT_NAME")
             .withDescription("Specify project level backup and restore (optional)").isRequired(false).create("project");
 
+    private static final Option OPTION_TARGET = OptionBuilder.getInstance().hasArg().withArgName("TARGET_FILE")
+            .withDescription("Specify part of metadata for fetch to local path").isRequired(false).create("target");
+
     private static final Option FOLDER_NAME = OptionBuilder.getInstance().hasArg().withArgName("FOLDER_NAME")
             .withDescription("Specify the folder name for backup").isRequired(false).create("folder");
 
@@ -116,6 +125,9 @@ public class MetadataTool extends ExecutableApplication {
     @Getter
     private String backupPath;
 
+    @Getter
+    private String fetchPath;
+
     MetadataTool() {
         kylinConfig = KylinConfig.getInstanceFromEnv();
         this.options = new Options();
@@ -154,7 +166,8 @@ public class MetadataTool extends ExecutableApplication {
             val optionsHelper = new OptionsHelper();
             optionsHelper.parseOptions(tool.getOptions(), args);
             boolean isBackup = optionsHelper.hasOption(OPERATE_BACKUP);
-            if (isBackup && ScreenPrintUtil.isMainThread()) {
+            boolean isFetch = optionsHelper.hasOption(OPERATE_FETCH);
+            if ((isBackup || isFetch) && ScreenPrintUtil.isMainThread()) {
                 config.setProperty("kylin.env.metadata.only-for-read", "true");
             }
             val resourceStore = ResourceStore.getKylinMetaStore(config);
@@ -258,12 +271,15 @@ public class MetadataTool extends ExecutableApplication {
         final OptionGroup optionGroup = new OptionGroup();
         optionGroup.setRequired(true);
         optionGroup.addOption(OPERATE_BACKUP);
+        optionGroup.addOption(OPERATE_FETCH);
+        optionGroup.addOption(OPERATE_LIST);
         optionGroup.addOption(OPERATE_RESTORE);
 
         options.addOptionGroup(optionGroup);
         options.addOption(OPTION_DIR);
         options.addOption(OPTION_PROJECT);
         options.addOption(FOLDER_NAME);
+        options.addOption(OPTION_TARGET);
         options.addOption(OPERATE_COMPRESS);
         options.addOption(OPTION_EXCLUDE_TABLE_EXD);
         options.addOption(OPTION_AFTER_TRUNCATE);
@@ -305,6 +321,10 @@ public class MetadataTool extends ExecutableApplication {
                 }
             }
 
+        } else if (optionsHelper.hasOption(OPERATE_FETCH)) {
+            fetch(optionsHelper);
+        } else if (optionsHelper.hasOption(OPERATE_LIST)) {
+            list(optionsHelper);
         } else if (optionsHelper.hasOption(OPERATE_RESTORE)) {
             restore(optionsHelper, optionsHelper.hasOption(OPTION_AFTER_TRUNCATE));
         } else {
@@ -330,6 +350,68 @@ public class MetadataTool extends ExecutableApplication {
         }
     }
 
+    private void fetch(OptionsHelper optionsHelper) throws Exception {
+        var path = optionsHelper.getOptionValue(OPTION_DIR);
+        var folder = optionsHelper.getOptionValue(FOLDER_NAME);
+        val excludeTableExd = optionsHelper.hasOption(OPTION_EXCLUDE_TABLE_EXD);
+        val target = optionsHelper.getOptionValue(OPTION_TARGET);
+        if (StringUtils.isBlank(path)) {
+            path = KylinConfigBase.getKylinHome() + File.separator + "meta_fetch";
+        }
+        if (StringUtils.isEmpty(folder)) {
+            folder = LocalDateTime.now(Clock.systemDefaultZone()).format(DATE_TIME_FORMATTER) + "_fetch";
+        }
+        if (target == null) {
+            System.out.println("target file must be set with fetch mode");
+        } else {
+            fetchPath = StringUtils.appendIfMissing(path, "/") + folder;
+            // currently do not support compress with fetch
+            val fetchMetadataUrl = getMetadataUrl(fetchPath, false);
+            val fetchConfig = KylinConfig.createKylinConfig(kylinConfig);
+            fetchConfig.setMetadataUrl(fetchMetadataUrl);
+            abortIfAlreadyExists(fetchPath);
+            logger.info("The fetch metadataUrl is {} and backup path is {}", fetchMetadataUrl, fetchPath);
+
+            try (val fetchResourceStore = ResourceStore.getKylinMetaStore(fetchConfig)) {
+
+                val fetchMetadataStore = fetchResourceStore.getMetadataStore();
+
+                String targetPath = target.startsWith("/") ? target.substring(1) : target;
+
+                logger.info("start to copy target file {} from ResourceStore.", target);
+                UnitOfWork.doInTransactionWithRetry(
+                        UnitOfWorkParams.builder().readonly(true).unitName(target).processor(() -> {
+                            copyResourceStore("/" + targetPath, resourceStore, fetchResourceStore, true, excludeTableExd);
+                            // uuid
+                            val uuid = resourceStore.getResource(ResourceStore.METASTORE_UUID_TAG);
+                            fetchResourceStore.putResourceWithoutCheck(uuid.getResPath(), uuid.getByteSource(),
+                                    uuid.getTimestamp(), -1);
+                            return null;
+                        }).build());
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new InterruptedException("metadata task is interrupt");
+                }
+                logger.info("start to fetch target file {}", target);
+
+                // fetchResourceStore is read-only, currently we don't do any write operation on it.
+                // fetchResourceStore.deleteResource(ResourceStore.METASTORE_TRASH_RECORD);
+                fetchMetadataStore.dump(fetchResourceStore);
+                logger.info("fetch successfully at {}", fetchPath);
+            }
+        }
+    }
+
+    private NavigableSet<String> list(OptionsHelper optionsHelper) throws Exception {
+        val target = optionsHelper.getOptionValue(OPTION_TARGET);
+        var res = resourceStore.listResources(target);
+        if (res == null) {
+            System.out.printf("%s is not exist%n", target);
+        } else {
+            System.out.println("" + res);
+        }
+        return res;
+    }
+
     private void backup(OptionsHelper optionsHelper) throws Exception {
         val project = optionsHelper.getOptionValue(OPTION_PROJECT);
         var path = optionsHelper.getOptionValue(OPTION_DIR);
diff --git a/src/tool/src/test/java/org/apache/kylin/tool/MetadataToolTest.java b/src/tool/src/test/java/org/apache/kylin/tool/MetadataToolTest.java
index b82216ab9f..d8ddd329aa 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/MetadataToolTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/MetadataToolTest.java
@@ -124,6 +124,38 @@ public class MetadataToolTest extends NLocalFileMetadataTestCase {
         return tool;
     }
 
+    @Test
+    public void testFetchTargetFile() throws IOException {
+        val junitFolder = temporaryFolder.getRoot();
+        val tool = new MetadataTool(getTestConfig());
+        // test case for fetching a specific file
+        tool.execute(new String[] {
+                "-fetch", "-target", "default/table/DEFAULT.STREAMING_TABLE.json", "-dir", junitFolder.getAbsolutePath(), "-folder", "target_fetch"
+        });
+        //test case for fetching a folder
+        tool.execute(new String[] {
+                "-fetch", "-target", "_global", "-dir", junitFolder.getAbsolutePath(), "-folder", "target_fetch_global"
+        });
+
+        Assertions.assertThat(junitFolder.listFiles()).hasSize(2);
+        val archiveFolder = junitFolder.listFiles()[1];
+        val globleFolder = junitFolder.listFiles()[0];
+        Assertions.assertThat(archiveFolder).exists();
+
+        Assertions.assertThat(archiveFolder.list()).isNotEmpty().containsOnly("default", "UUID");
+
+        val projectFolder = findFile(archiveFolder.listFiles(), f -> f.getName().equals("default"));
+        assertProjectFolder(projectFolder, globleFolder);
+    }
+
+    @Test
+    public void testListFile() {
+        val tool = new MetadataTool(getTestConfig());
+        tool.execute(new String[] {
+                "-list", "-target", "default"
+        });
+    }
+
     @Test
     public void testBackupProject() throws IOException {
         val junitFolder = temporaryFolder.getRoot();


[kylin] 01/02: KYLIN-5361 modify email notification function and adjust email hard code to config file

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1270d6f15e69b47c549beba22594a75603a408b7
Author: cli2 <cl...@ebay.com>
AuthorDate: Fri Jan 13 11:43:33 2023 +0800

    KYLIN-5361 modify email notification function and adjust email hard code to config file
---
 .../setting/SettingAdvanced/SettingAdvanced.vue    |  40 ++-
 .../components/setting/SettingAdvanced/handler.js  |  11 +-
 .../components/setting/SettingAdvanced/locales.js  |   6 +-
 pom.xml                                            |   6 +
 .../rest/request/JobNotificationConfigRequest.java |   6 +-
 .../kylin/rest/response/ProjectConfigResponse.java |   8 +-
 .../apache/kylin/rest/service/ProjectService.java  |  18 +-
 src/core-common/pom.xml                            |   4 +
 .../org/apache/kylin/common/KylinConfigBase.java   |  15 +-
 .../constant/NonCustomProjectLevelConfig.java      |   7 +-
 .../common/util/BasicEmailNotificationContent.java |  62 ----
 .../org/apache/kylin/common/util/MailHelper.java   |  65 ++--
 .../kylin/common/util/MailTemplateProvider.java    |  87 +++++
 .../org/apache/kylin/common/util/StringUtil.java   |   4 +
 .../apache/kylin/common/KylinConfigBaseTest.java   |  12 +-
 .../apache/kylin/common/util/MailServiceTest.java  |   2 +-
 .../apache/kylin/job/constant/JobIssueEnum.java    |   4 +-
 .../kylin/job/execution/AbstractExecutable.java    | 192 +++++++---
 .../kylin/job/execution/DefaultExecutable.java     | 120 ++++---
 .../job/execution/EmailNotificationContent.java    | 211 ++++++++---
 .../kylin/job/execution/ExecutableState.java       |  13 +
 .../kylin/job/execution/NExecutableManager.java    |   5 +-
 .../kylin/job/util/MailNotificationUtil.java       | 104 ++++++
 .../main/resources/mail_templates/JOB_DISCARD.ftl  | 274 +++++++++++++++
 .../main/resources/mail_templates/JOB_ERROR.ftl    | 390 +++++++++++++++++++++
 .../main/resources/mail_templates/JOB_SUCCEED.ftl  | 273 +++++++++++++++
 .../resources/mail_templates/LOAD_EMPTY_DATA.ftl   | 200 +++++++++++
 .../mail_templates/METADATA_PERSIST_FAIL.ftl       | 232 ++++++++++++
 .../mail_templates/OVER_CAPACITY_THRESHOLD.ftl     | 200 +++++++++++
 .../mail_templates/SOURCE_RECORDS_CHANGE.ftl       | 205 +++++++++++
 .../kylin/job/execution/ErrorTestExecutable.java   |   4 +-
 .../job/execution/NExecutableManagerTest.java      | 101 +++++-
 .../kylin/job/execution/SucceedTestExecutable.java |   5 +-
 .../job/impl/threadpool/NDefaultSchedulerTest.java |   3 +-
 .../kylin/metadata/project/ProjectInstance.java    |  10 +
 .../metadata/sourceusage/SourceUsageManager.java   |   6 +-
 .../test_case_data/localmeta/kylin.properties      |  23 +-
 .../apache/kylin/rest/service/JobServiceTest.java  |   7 +-
 .../kylin/rest/controller/NProjectController.java  |   9 +-
 .../rest/controller/NProjectControllerTest.java    |   5 +-
 .../kylin/rest/service/ProjectServiceTest.java     |  14 +-
 .../streaming/jobs/StreamingDFMergeJobTest.java    |   2 +-
 42 files changed, 2654 insertions(+), 311 deletions(-)

diff --git a/kystudio/src/components/setting/SettingAdvanced/SettingAdvanced.vue b/kystudio/src/components/setting/SettingAdvanced/SettingAdvanced.vue
index 031a03f2de..0ea34355a8 100644
--- a/kystudio/src/components/setting/SettingAdvanced/SettingAdvanced.vue
+++ b/kystudio/src/components/setting/SettingAdvanced/SettingAdvanced.vue
@@ -46,13 +46,28 @@
         </span>
         <div class="setting-desc">{{$t('emptyDataLoadDesc')}}</div>
         <div class="split"></div>
-        <span class="setting-label font-medium">{{$t('errorJob')}}</span><span class="setting-value fixed">
+        <span class="setting-label font-medium">{{$t('metaDataPersist')}}</span><span class="setting-value fixed">
           <el-switch
-            v-model="form.job_error_notification_enabled"
+            v-model="form.metadata_persist_notification_enabled"
             :active-text="$t('kylinLang.common.OFF')"
             :inactive-text="$t('kylinLang.common.ON')">
           </el-switch>
         </span>
+        <div class="setting-desc">{{$t('metaDataPersistDesc')}}</div>
+        <div class="split"></div>
+        <span class="setting-label font-medium">{{$t('jobState')}} :</span><span class="setting-value fixed">
+        </span>
+         <span class="setting-value">
+              {{form.job_notification_states.map(states => $t(states)).join(', ')}}
+            </span>
+            <el-checkbox-group class="setting-input" :value="form.job_notification_states" @input="handleInputJobState">
+              <el-checkbox
+                v-for="jobState in jobNotificationStateTypes"
+                :key="jobState"
+                :label="jobState">
+                {{$t(jobState)}}
+              </el-checkbox>
+            </el-checkbox-group>
         <div class="setting-desc">{{$t('errorJobDesc')}}</div>
       </div>
       <div class="setting-item">
@@ -308,7 +323,17 @@ import { Component, Watch } from 'vue-property-decorator'
 import { handleError, handleSuccessAsync, objectArraySort } from '../../../util'
 import { kylinConfirm } from 'util/business'
 import { apiUrl } from '../../../config'
-import { validate, _getJobAlertSettings, _getDefaultDBSettings, _getYarnNameSetting, _getSecStorageSetting, _getExposeCCSetting, _getSnapshotSetting, _getKerberosSettings } from './handler'
+import {
+  validate,
+  _getJobAlertSettings,
+  _getDefaultDBSettings,
+  _getYarnNameSetting,
+  _getSecStorageSetting,
+  _getExposeCCSetting,
+  _getSnapshotSetting,
+  _getKerberosSettings,
+  jobNotificationStateTypes
+} from './handler'
 import EditableBlock from '../../common/EditableBlock/EditableBlock.vue'
 import { pageRefTags, pageCount } from 'config'
 
@@ -377,6 +402,7 @@ export default class SettingAdvanced extends Vue {
   pageSize = pageCount
   convertedProperties = []
   pageRefTags = pageRefTags
+  jobNotificationStateTypes=jobNotificationStateTypes
 
   dbList = []
   nodes = []
@@ -385,8 +411,9 @@ export default class SettingAdvanced extends Vue {
     project: '',
     // tips_enabled: true,
     // threshold: 20,
-    job_error_notification_enabled: true,
+    job_notification_states: [],
     data_load_empty_notification_enabled: true,
+    metadata_persist_notification_enabled: false,
     job_notification_emails: [],
     default_database: this.$store.state.project.projectDefaultDB || '',
     yarn_queue: this.$store.state.project.yarn_queue || '',
@@ -1129,6 +1156,11 @@ export default class SettingAdvanced extends Vue {
     this.pageSize = pageSize
     this.convertedProperties = this.configList.slice(this.pageSize * currentPage, this.pageSize * (currentPage + 1))
   }
+  handleInputJobState (value) {
+    if (value.length >= 0) {
+      this.form.job_notification_states = value
+    }
+  }
 }
 </script>
 
diff --git a/kystudio/src/components/setting/SettingAdvanced/handler.js b/kystudio/src/components/setting/SettingAdvanced/handler.js
index 61ee70b374..3ab1a8c307 100644
--- a/kystudio/src/components/setting/SettingAdvanced/handler.js
+++ b/kystudio/src/components/setting/SettingAdvanced/handler.js
@@ -36,9 +36,10 @@ export function _getJobAlertSettings (data, isArrayDefaultValue, isSort) {
 
   return {
     project: data.project,
-    job_error_notification_enabled: data.job_error_notification_enabled,
+    metadata_persist_notification_enabled: data.metadata_persist_notification_enabled,
     data_load_empty_notification_enabled: data.data_load_empty_notification_enabled,
-    job_notification_emails: jobEmails
+    job_notification_emails: jobEmails,
+    job_notification_states: data.job_notification_states
   }
 }
 
@@ -84,3 +85,9 @@ export function _getKerberosSettings (data) {
     principal: data.principal
   }
 }
+
+export const jobNotificationStateTypes = [
+  'Succeed',
+  'Error',
+  'Discard'
+]
diff --git a/kystudio/src/components/setting/SettingAdvanced/locales.js b/kystudio/src/components/setting/SettingAdvanced/locales.js
index 31e384bf1a..0afc2e478e 100644
--- a/kystudio/src/components/setting/SettingAdvanced/locales.js
+++ b/kystudio/src/components/setting/SettingAdvanced/locales.js
@@ -7,8 +7,10 @@ export default {
     jobAlert: 'Email Notification',
     emptyDataLoad: 'Empty Data Job',
     emptyDataLoadDesc: 'Email to the following email address(es) if there is a job loading empty data.',
-    errorJob: 'Error Job',
-    errorJobDesc: 'Email to the following email address(es) if an error occured for the job.',
+    metaDataPersist: 'MetaData Persist To HDFS',
+    metaDataPersistDesc: 'Email to the following email address(es) if metadata persist to HDFS error.',
+    jobState: 'Job State',
+    errorJobDesc: 'Email to the following email address(es) when the issue occurred of the job with the according chosen state(s).',
     emails: 'Email Address:',
     noData: 'No Data',
     pleaseInputEmail: 'Please enter email',
diff --git a/pom.xml b/pom.xml
index 8f8fb7fd4f..049c330a51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -542,6 +542,12 @@
                 <type>test-jar</type>
             </dependency>
 
+            <dependency>
+                <groupId>org.freemarker</groupId>
+                <artifactId>freemarker</artifactId>
+                <version>2.3.31</version>
+            </dependency>
+
             <!-- arthas -->
             <dependency>
                 <groupId>com.taobao.arthas</groupId>
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/request/JobNotificationConfigRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/JobNotificationConfigRequest.java
index 7d2ef5875c..89885aaf12 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/request/JobNotificationConfigRequest.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/request/JobNotificationConfigRequest.java
@@ -26,11 +26,13 @@ import lombok.Data;
 
 @Data
 public class JobNotificationConfigRequest {
-    @JsonProperty("job_error_notification_enabled")
-    private Boolean jobErrorNotificationEnabled;
     @JsonProperty("data_load_empty_notification_enabled")
     private Boolean dataLoadEmptyNotificationEnabled;
+    @JsonProperty("job_notification_states")
+    private List<String> jobNotificationStates;
     @JsonProperty("job_notification_emails")
     private List<String> jobNotificationEmails;
+    @JsonProperty("metadata_persist_notification_enabled")
+    private Boolean metadataPersistNotificationEnabled;
 
 }
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/response/ProjectConfigResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/ProjectConfigResponse.java
index c2c724f490..784731ec9b 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/response/ProjectConfigResponse.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/response/ProjectConfigResponse.java
@@ -88,8 +88,12 @@ public class ProjectConfigResponse {
     @JsonProperty("retention_range")
     private RetentionRange retentionRange;
 
-    @JsonProperty("job_error_notification_enabled")
-    private boolean jobErrorNotificationEnabled;
+    @JsonProperty("job_notification_states")
+    private List<String> jobNotificationStates;
+
+    @JsonProperty("metadata_persist_notification_enabled")
+    private boolean metadataPersistNotificationEnabled;
+
     @JsonProperty("data_load_empty_notification_enabled")
     private boolean dataLoadEmptyNotificationEnabled;
     @JsonProperty("job_notification_emails")
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index 93d8cad3ba..b24de0d000 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -451,10 +451,12 @@ public class ProjectService extends BasicService {
         Map<String, String> overrideKylinProps = Maps.newHashMap();
         overrideKylinProps.put("kylin.job.notification-on-empty-data-load",
                 String.valueOf(jobNotificationConfigRequest.getDataLoadEmptyNotificationEnabled()));
-        overrideKylinProps.put("kylin.job.notification-on-job-error",
-                String.valueOf(jobNotificationConfigRequest.getJobErrorNotificationEnabled()));
-        overrideKylinProps.put("kylin.job.notification-admin-emails",
+        overrideKylinProps.put("kylin.job.notification-enable-states",
+                String.join(",", Sets.newHashSet(jobNotificationConfigRequest.getJobNotificationStates())));
+        overrideKylinProps.put("kylin.job.notification-user-emails",
                 convertToString(jobNotificationConfigRequest.getJobNotificationEmails()));
+        overrideKylinProps.put("kylin.job.notification-on-metadata-persist",
+                String.valueOf(jobNotificationConfigRequest.getMetadataPersistNotificationEnabled()));
         updateProjectOverrideKylinProps(project, overrideKylinProps);
     }
 
@@ -571,8 +573,9 @@ public class ProjectService extends BasicService {
         response.setFavoriteQueryTipsEnabled(config.getFavoriteQueryAccelerateTipsEnabled());
 
         response.setDataLoadEmptyNotificationEnabled(config.getJobDataLoadEmptyNotificationEnabled());
-        response.setJobErrorNotificationEnabled(config.getJobErrorNotificationEnabled());
-        response.setJobNotificationEmails(Lists.newArrayList(config.getAdminDls()));
+        response.setJobNotificationEmails(projectInstance.getEmailUsers());
+        response.setJobNotificationStates(Lists.newArrayList(config.getJobNotificationStates()));
+        response.setMetadataPersistNotificationEnabled(config.getJobMetadataPersistNotificationEnabled());
 
         response.setFrequencyTimeWindow(config.getFrequencyTimeWindowInDays());
 
@@ -1045,8 +1048,9 @@ public class ProjectService extends BasicService {
     private void resetJobNotificationConfig(String project) {
         Set<String> toBeRemovedProps = Sets.newHashSet();
         toBeRemovedProps.add("kylin.job.notification-on-empty-data-load");
-        toBeRemovedProps.add("kylin.job.notification-on-job-error");
-        toBeRemovedProps.add("kylin.job.notification-admin-emails");
+        toBeRemovedProps.add("kylin.job.notification-enable-states");
+        toBeRemovedProps.add("kylin.job.notification-user-emails");
+        toBeRemovedProps.add("kylin.job.notification-on-metadata-persist");
         removeProjectOveridedProps(project, toBeRemovedProps);
     }
 
diff --git a/src/core-common/pom.xml b/src/core-common/pom.xml
index 47bd12746c..bac48ef867 100644
--- a/src/core-common/pom.xml
+++ b/src/core-common/pom.xml
@@ -35,6 +35,10 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.freemarker</groupId>
+            <artifactId>freemarker</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.kyligence.ke</groupId>
             <artifactId>kap-external-guava20</artifactId>
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7bd4da912a..e9d033b675 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1148,6 +1148,17 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptionalStringArray("kylin.job.notification-admin-emails", new String[0]);
     }
 
+    public String[] getJobNotificationStates() {
+        return getOptionalStringArray("kylin.job.notification-enable-states", new String[0]);
+    }
+
+    public int getJobMetadataPersistRetry() {
+        return Integer.parseInt(this.getOptional("kylin.job.metadata-persist-retry", "5"));
+    }
+
+    public Boolean getJobMetadataPersistNotificationEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.job.notification-on-metadata-persist", FALSE));
+    }
     public int getJobRetry() {
         return Integer.parseInt(getOptional("kylin.job.retry", "0"));
     }
@@ -2484,10 +2495,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.job.notification-on-empty-data-load", FALSE));
     }
 
-    public boolean getJobErrorNotificationEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.job.notification-on-job-error", FALSE));
-    }
-
     public Long getStorageResourceSurvivalTimeThreshold() {
         return TimeUtil.timeStringAs(this.getOptional("kylin.storage.resource-survival-time-threshold", "7d"),
                 TimeUnit.MILLISECONDS);
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/constant/NonCustomProjectLevelConfig.java b/src/core-common/src/main/java/org/apache/kylin/common/constant/NonCustomProjectLevelConfig.java
index 5712084642..7158a8bb7f 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/constant/NonCustomProjectLevelConfig.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/constant/NonCustomProjectLevelConfig.java
@@ -40,9 +40,10 @@ public enum NonCustomProjectLevelConfig {
     PUSH_DOWN_ENABLED("kylin.query.pushdown-enabled"),
 
     JOB_DATA_LOAD_EMPTY_NOTIFICATION_ENABLED(
-            "kylin.job.notification-on-empty-data-load"), JOB_ERROR_NOTIFICATION_ENABLED(
-                    "kylin.job.notification-on-job-error"), NOTIFICATION_ADMIN_EMAILS(
-                            "kylin.job.notification-admin-emails"),
+            "kylin.job.notification-on-empty-data-load"), JOB_NOTIFICATION_ENABLED_STATES(
+                    "kylin.job.notification-enable-states"), NOTIFICATION_USER_EMAILS(
+                            "kylin.job.notification-user-emails"), NOTIFICATION_ON_METADATA_PERSIST(
+                                    "kylin.job.notification-on-metadata-persist"),
 
     ENGINE_SPARK_YARN_QUEUE("kylin.engine.spark-conf.spark.yarn.queue"),
 
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/BasicEmailNotificationContent.java b/src/core-common/src/main/java/org/apache/kylin/common/util/BasicEmailNotificationContent.java
deleted file mode 100644
index e164197324..0000000000
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/BasicEmailNotificationContent.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.util;
-
-import lombok.Getter;
-import lombok.Setter;
-
-@Getter
-@Setter
-public class BasicEmailNotificationContent {
-
-    public static final String NOTIFY_EMAIL_TITLE_TEMPLATE = "[Apache Kylin System Notification] ${issue}";
-    public static final String NOTIFY_EMAIL_BODY_TEMPLATE = "<div style='display:block;word-wrap:break-word;width:80%;font-size:16px;font-family:Microsoft YaHei;'><b>Dear Apache Kylin User,</b><pre><p>"
-            + "<p>${conclusion}</p>" + "<p>Issue: ${issue}<br>" + "Type: ${type}<br>" + "Time: ${time}<br>"
-            + "Project: ${project}<br>" + "Solution: ${solution}</p>" + "<p>Yours sincerely,<br>" + "Apache Kylin Community</p>"
-            + "</pre><div/>";
-
-    public static final String CONCLUSION_FOR_JOB_ERROR = "We found an error job happened in your Apache Kylin system as below. It won't affect your system stability and you may repair it by following instructions.";
-    public static final String CONCLUSION_FOR_LOAD_EMPTY_DATA = "We found a job has loaded empty data in your Apache Kylin system as below. It won't affect your system stability and you may reload data by following instructions.";
-    public static final String CONCLUSION_FOR_SOURCE_RECORDS_CHANGE = "We found some source records updated in your Apache Kylin system. You can reload updated records by following instructions. Ignore this issue may cause query result inconsistency over different indexes.";
-    public static final String CONCLUSION_FOR_OVER_CAPACITY_THRESHOLD = "The amount of data volume used (${volume_used}/${volume_total}) has reached ${capacity_threshold}% of the license’s limit.";
-
-    public static final String SOLUTION_FOR_JOB_ERROR = "You may resume the job first. If still won't work, please send the job's diagnostic package to kyligence technical support.";
-    public static final String SOLUTION_FOR_LOAD_EMPTY_DATA = "You may refresh the empty segment of the model ${model_name} to reload data.";
-    public static final String SOLUTION_FOR_SOURCE_RECORDS_CHANGE = "You may refresh the segment from ${start_time} to ${end_time} to apply source records change.";
-    public static final String SOLUTION_FOR_OVER_CAPACITY_THRESHOLD = "To ensure the availability of your service, please contact Kyligence to get a new license, or try deleting some segments.";
-
-    private String conclusion;
-    private String issue;
-    private String time;
-    private String solution;
-
-    private String jobType;
-    private String project;
-
-    public String getEmailTitle() {
-        return NOTIFY_EMAIL_TITLE_TEMPLATE.replaceAll("\\$\\{issue\\}", issue);
-    }
-
-    public String getEmailBody() {
-        return NOTIFY_EMAIL_BODY_TEMPLATE.replaceAll("\\$\\{conclusion\\}", conclusion)
-                .replaceAll("\\$\\{issue\\}", issue).replaceAll("\\$\\{type\\}", jobType)
-                .replaceAll("\\$\\{time\\}", time).replaceAll("\\$\\{project\\}", project)
-                .replaceAll("\\$\\{solution\\}", solution);
-    }
-}
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/MailHelper.java b/src/core-common/src/main/java/org/apache/kylin/common/util/MailHelper.java
index 29875bdb12..2caa3c6e90 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/MailHelper.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/MailHelper.java
@@ -18,20 +18,22 @@
 
 package org.apache.kylin.common.util;
 
-import java.math.BigDecimal;
-import java.time.Clock;
-import java.time.LocalDate;
-import java.util.Collections;
-import java.util.List;
-
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 public class MailHelper {
 
+    public static final String OVER_CAPACITY_THRESHOLD = "OVER_CAPACITY_THRESHOLD";
+    public static final String CAPACITY = "CAPACITY";
     protected static final Logger logger = LoggerFactory.getLogger(MailHelper.class);
 
     public static List<String> getOverCapacityMailingUsers(KylinConfig kylinConfig) {
@@ -47,24 +49,13 @@ public class MailHelper {
         return users;
     }
 
-    public static Pair<String, String> formatNotifications(BasicEmailNotificationContent content) {
-        if (content == null) {
-            return null;
-        }
-        String title = content.getEmailTitle();
-        String body = content.getEmailBody();
-        return Pair.newPair(title, body);
-    }
-
-    public static boolean notifyUser(KylinConfig kylinConfig, BasicEmailNotificationContent content,
-            List<String> users) {
+    public static boolean notifyUser(KylinConfig kylinConfig, Pair<String, String> mail, List<String> users) {
         try {
             if (users.isEmpty()) {
                 logger.debug("no need to send email, user list is empty.");
                 return false;
             }
-            final Pair<String, String> email = MailHelper.formatNotifications(content);
-            return doSendMail(kylinConfig, users, email);
+            return doSendMail(kylinConfig, users, mail);
         } catch (Exception e) {
             logger.error("error send email", e);
             return false;
@@ -81,27 +72,29 @@ public class MailHelper {
         return new MailService(kylinConfig).sendMail(users, email.getFirst(), email.getSecond());
     }
 
-    public static BasicEmailNotificationContent creatContentForCapacityUsage(Long licenseVolume, Long currentCapacity) {
-        BasicEmailNotificationContent content = new BasicEmailNotificationContent();
-        content.setIssue("Over capacity threshold");
-        content.setTime(LocalDate.now(Clock.systemDefaultZone()).toString());
-        content.setJobType("CHECK_USAGE");
-        content.setProject("NULL");
+    public static Pair<String, String> creatContentForCapacityUsage(Long licenseVolume, Long currentCapacity, String resourceName) {
 
         String readableCurrentCapacity = SizeConvertUtil.getReadableFileSize(currentCapacity);
         String readableLicenseVolume = SizeConvertUtil.getReadableFileSize(licenseVolume);
         double overCapacityThreshold = KylinConfig.getInstanceFromEnv().getOverCapacityThreshold() * 100;
-        content.setConclusion(BasicEmailNotificationContent.CONCLUSION_FOR_OVER_CAPACITY_THRESHOLD
-                .replaceAll("\\$\\{volume_used\\}", readableCurrentCapacity)
-                .replaceAll("\\$\\{volume_total\\}", readableLicenseVolume)
-                .replaceAll("\\$\\{capacity_threshold\\}", BigDecimal.valueOf(overCapacityThreshold).toString()));
-        content.setSolution(BasicEmailNotificationContent.SOLUTION_FOR_OVER_CAPACITY_THRESHOLD);
-        return content;
+        KylinConfig env = KylinConfig.getInstanceFromEnv();
+
+        Map<String, Object> dataMap = Maps.newHashMap();
+        dataMap.put("resource_name", resourceName);
+        dataMap.put("volume_used", readableCurrentCapacity);
+        dataMap.put("volume_total", readableLicenseVolume);
+        dataMap.put("capacity_threshold", BigDecimal.valueOf(overCapacityThreshold).toString());
+        dataMap.put("env_name", KylinConfig.getInstanceFromEnv().getDeployEnv());
+        String title = getMailTitle(CAPACITY,
+                OVER_CAPACITY_THRESHOLD,
+                env.getMetadataUrlPrefix(),
+                env.getDeployEnv());
+        String content = MailTemplateProvider.getInstance().buildMailContent(OVER_CAPACITY_THRESHOLD, dataMap);
+        return Pair.newPair(title, content);
     }
 
-    public static boolean notifyUserForOverCapacity(Long licenseVolume, Long currentCapacity) {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        List<String> users = getOverCapacityMailingUsers(kylinConfig);
-        return notifyUser(kylinConfig, creatContentForCapacityUsage(licenseVolume, currentCapacity), users);
+
+    public static String getMailTitle(String... titleParts) {
+        return "[" + Joiner.on("]-[").join(titleParts) + "]";
     }
 }
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/MailTemplateProvider.java b/src/core-common/src/main/java/org/apache/kylin/common/util/MailTemplateProvider.java
new file mode 100644
index 0000000000..453a240692
--- /dev/null
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/MailTemplateProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.util;
+
+
+import freemarker.template.Configuration;
+import freemarker.template.Template;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kylin.common.util.MailHelper.creatContentForCapacityUsage;
+import static org.apache.kylin.common.util.MailHelper.getOverCapacityMailingUsers;
+import static org.apache.kylin.common.util.MailHelper.notifyUser;
+
+/**
+ * Use a key to find a template for email.
+ *
+ * The template file is [KEY].ftl file under /mail_templates directory with classloader.
+ */
+public class MailTemplateProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(MailTemplateProvider.class);
+
+    private static MailTemplateProvider INSTANCE = new MailTemplateProvider();
+
+    public static MailTemplateProvider getInstance() {
+        return INSTANCE;
+    }
+
+    private final Configuration configuration;
+
+    private MailTemplateProvider() {
+        configuration = new Configuration(Configuration.getVersion());
+        configuration.setClassForTemplateLoading(MailTemplateProvider.class, "/mail_templates");
+        configuration.setDefaultEncoding("UTF-8");
+    }
+
+    public String buildMailContent(String tplKey, Map<String, Object> data) {
+        try {
+            Template template = getTemplate(tplKey);
+            if (template == null) {
+                return "Cannot find email template for " + tplKey;
+            }
+
+            try (Writer out = new StringWriter()) {
+                template.process(data, out);
+                return out.toString();
+            }
+        } catch (Exception e) {
+            logger.error("build mail content failed : {}", e.getLocalizedMessage());
+            return e.getLocalizedMessage();
+        }
+    }
+
+    private Template getTemplate(String tplKey) throws IOException {
+        return configuration.getTemplate(tplKey + ".ftl");
+    }
+
+    public static boolean notifyUserForOverCapacity(Long licenseVolume, Long currentCapacity, String resourceName) {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        List<String> users = getOverCapacityMailingUsers(kylinConfig);
+        return notifyUser(kylinConfig, creatContentForCapacityUsage(licenseVolume, currentCapacity, resourceName), users);
+    }
+}
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
index 32af714687..5776a5895b 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
@@ -209,4 +209,8 @@ public class StringUtil {
         return "true".equals(s) || "false".equals(s);
     }
 
+    public static String[] split(String str, String splitBy) {
+        return str.split(splitBy);
+    }
+
 }
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index f60a2b003b..d037fea7f7 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -627,9 +627,11 @@ class KylinConfigBaseTest {
 
         map.put("getJobDataLoadEmptyNotificationEnabled",
                 new PropertiesEntity("kylin.job.notification-on-empty-data-load", "false", false));
+        map.put("getJobNotificationStates",
+                new PropertiesEntity("kylin.job.notification-enable-states", "", new String[0]));
 
-        map.put("getJobErrorNotificationEnabled",
-                new PropertiesEntity("kylin.job.notification-on-job-error", "false", false));
+        map.put("getJobMetadataPersistNotificationEnabled",
+                new PropertiesEntity("kylin.job.notification-on-metadata-persist", "false", false));
 
         map.put("getStorageResourceSurvivalTimeThreshold",
                 new PropertiesEntity("kylin.storage.resource-survival-time-threshold", "7d", 7L * 24 * 60 * 60 * 1000));
@@ -899,7 +901,7 @@ class KylinConfigBaseTest {
         map.put("isSkipBasicAuthorization",
                 new PropertiesEntity("kap.authorization.skip-basic-authorization", "false", false));
         map.put("getMetricsQuerySlaSeconds",
-                new PropertiesEntity("kylin.metrics.query.sla.seconds", "1,3,a,15,60", new long[] { 3, 15, 60 }));
+                new PropertiesEntity("kylin.metrics.query.sla.seconds", "1,3,15,60", new long[] { 1, 3, 15, 60 }));
         map.put("getMetricsJobSlaMinutes",
                 new PropertiesEntity("kylin.metrics.job.sla.minutes", "1,30,60,300", new long[] { 1, 30, 60, 300 }));
         map.put("isMetadataKeyCaseInSensitiveEnabled",
@@ -982,9 +984,9 @@ class KylinConfigBaseTest {
     @Test
     void testGetNonCustomProjectConfigs() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        assertEquals(19, config.getNonCustomProjectConfigs().size());
-        config.setProperty("kylin.server.non-custom-project-configs", "kylin.job.retry");
         assertEquals(20, config.getNonCustomProjectConfigs().size());
+        config.setProperty("kylin.server.non-custom-project-configs", "kylin.job.retry");
+        assertEquals(21, config.getNonCustomProjectConfigs().size());
     }
 
     @Test
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java b/src/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
index b9767afc59..8a1ce6e54f 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
@@ -65,7 +65,7 @@ public class MailServiceTest extends AbstractTestCase {
     public void testMailHelper() {
         overwriteSystemProp("kylin.capacity.notification-enabled", "true");
         overwriteSystemProp("kylin.capacity.notification-emails", "foobar@foobar.com");
-        boolean sent = MailHelper.notifyUserForOverCapacity(100L, 81L);
+        boolean sent = MailTemplateProvider.notifyUserForOverCapacity(100L, 81L, "abc");
         assert sent;
     }
 
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/constant/JobIssueEnum.java b/src/core-job/src/main/java/org/apache/kylin/job/constant/JobIssueEnum.java
index bc848a436b..60ee3afeb0 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/constant/JobIssueEnum.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/constant/JobIssueEnum.java
@@ -21,8 +21,8 @@ package org.apache.kylin.job.constant;
 import lombok.Getter;
 
 public enum JobIssueEnum {
-    JOB_ERROR("Error Job"), LOAD_EMPTY_DATA("Load Empty Data"), SOURCE_RECORDS_CHANGE(
-            "Source Records Change"), OVER_CAPACITY_THRESHOLD("Over capacity threshold");
+    LOAD_EMPTY_DATA("Load Empty Data"), SOURCE_RECORDS_CHANGE(
+            "Source Records Change");
 
     @Getter
     private final String displayName;
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 767b1ba183..a8c8857a75 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -36,14 +36,10 @@
 
 package org.apache.kylin.job.execution;
 
-import static org.apache.kylin.job.constant.ExecutableConstants.MR_JOB_ID;
-import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
-import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.IllegalFormatException;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -59,6 +55,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.NonCustomProjectLevelConfig;
 import org.apache.kylin.common.metrics.MetricsCategory;
 import org.apache.kylin.common.metrics.MetricsGroup;
 import org.apache.kylin.common.metrics.MetricsName;
@@ -73,6 +70,7 @@ import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.JobStoppedException;
 import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException;
+import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.cube.model.NDataLayout;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -104,8 +102,8 @@ public abstract class AbstractExecutable implements Executable {
     public interface Callback {
         void process() throws Exception;
 
-        default void onProcessError(Throwable throwable) {
-        }
+//        default void onProcessError(Throwable throwable) {
+//        }
     }
 
     protected static final String SUBMITTER = "submitter";
@@ -319,17 +317,17 @@ public abstract class AbstractExecutable implements Executable {
     }
 
     public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info,
-            String output, Consumer<String> hook) {
+            String output, Consumer<String> hook) throws ExecuteException, PersistentException, InterruptedException {
         updateJobOutput(project, jobId, newStatus, info, output, null, hook);
     }
 
     public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info,
-            String output, String failedMsg, Consumer<String> hook) {
+            String output, String failedMsg, Consumer<String> hook) throws ExecuteException, PersistentException, InterruptedException {
         updateJobOutput(project, jobId, newStatus, info, output, this.getLogPath(), failedMsg, hook);
     }
 
     public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info,
-            String output, String logPath, String failedMsg, Consumer<String> hook) {
+            String output, String logPath, String failedMsg, Consumer<String> hook) throws ExecuteException, PersistentException, InterruptedException {
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             NExecutableManager executableManager = getExecutableManager(project);
             val existedInfo = executableManager.getOutput(jobId).getExtra();
@@ -353,10 +351,11 @@ public abstract class AbstractExecutable implements Executable {
         }, project, UnitOfWork.DEFAULT_MAX_RETRY, getEpochId(), getTempLockName());
 
         //write output to HDFS
-        updateJobOutputToHDFS(project, jobId, output, logPath);
+        updateJobOutputWithPersistCheck(project, jobId, output, logPath);
     }
 
-    private static void updateJobOutputToHDFS(String project, String jobId, String output, String logPath) {
+    private static void updateJobOutputToHDFS(String project, String jobId, String output, String logPath)
+            throws PersistentException {
         NExecutableManager nExecutableManager = getExecutableManager(project);
         ExecutableOutputPO jobOutput = nExecutableManager.getJobOutput(jobId);
         if (null != output) {
@@ -544,7 +543,7 @@ public abstract class AbstractExecutable implements Executable {
     }
 
     // Ensure metadata compatibility
-    public abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException;
+    protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException, InterruptedException;
 
     @Override
     public boolean isRunnable() {
@@ -595,23 +594,24 @@ public abstract class AbstractExecutable implements Executable {
         val projectConfig = NProjectManager.getInstance(getConfig()).getProject(project).getConfig();
         boolean needNotification = true;
         switch (jobIssue) {
-        case JOB_ERROR:
-            needNotification = projectConfig.getJobErrorNotificationEnabled();
-            break;
         case LOAD_EMPTY_DATA:
             needNotification = projectConfig.getJobDataLoadEmptyNotificationEnabled();
+            String state = checkStateIfOverride(NonCustomProjectLevelConfig.JOB_DATA_LOAD_EMPTY_NOTIFICATION_ENABLED.getValue());
+            needNotification = state == null ? needNotification : Boolean.parseBoolean(state);
             break;
-        case SOURCE_RECORDS_CHANGE:
+        case SOURCE_RECORDS_CHANGE: //todo source record change
             needNotification = projectConfig.getJobSourceRecordsChangeNotificationEnabled();
             break;
         default:
             throw new IllegalArgumentException(String.format(Locale.ROOT, "no process for jobIssue: %s.", jobIssue));
         }
+        List<String> users;
+        users = getOverrideNotifyUsers();
+
         if (!needNotification) {
             return;
         }
-        List<String> users;
-        users = getAllNotifyUsers(projectConfig);
+
         if (this instanceof DefaultExecutable) {
             MailHelper.notifyUser(projectConfig, EmailNotificationContent.createContent(jobIssue, this), users);
         } else {
@@ -620,6 +620,21 @@ public abstract class AbstractExecutable implements Executable {
         }
     }
 
+    public final void notifyUserStatusChange(ExecutableState state) {
+        Preconditions.checkState(
+                (this instanceof DefaultExecutable) || this.getParent() instanceof DefaultExecutable);
+        val projectConfig = NProjectManager.getInstance(getConfig()).getProject(project).getConfig();
+        List<String> users = getOverrideNotifyUsers();
+        if (this instanceof DefaultExecutable) {
+            MailHelper.notifyUser(projectConfig, EmailNotificationContent.createContent(state,
+                    this, ((DefaultExecutable) this).getTasks()), users);
+        } else {
+            MailHelper.notifyUser(projectConfig, EmailNotificationContent.createContent(state,
+                            this.getParent(), ((DefaultExecutable) this.getParent()).getTasks()), users);
+        }
+    }
+
+
     public void setSparkYarnQueueIfEnabled(String project, String yarnQueue) {
         ProjectInstance proj = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project);
         KylinConfig config = proj.getConfig();
@@ -657,32 +672,32 @@ public abstract class AbstractExecutable implements Executable {
     }
 
     //will modify input info
-    public Map<String, String> makeExtraInfo(Map<String, String> info) {
-        if (info == null) {
-            return Maps.newHashMap();
-        }
-
-        // post process
-        if (info.containsKey(MR_JOB_ID) && !info.containsKey(YARN_APP_ID)) {
-            String jobId = info.get(MR_JOB_ID);
-            if (jobId.startsWith("job_")) {
-                info.put(YARN_APP_ID, jobId.replace("job_", "application_"));
-            }
-        }
-
-        if (info.containsKey(YARN_APP_ID)
-                && !org.apache.commons.lang3.StringUtils.isEmpty(getConfig().getJobTrackingURLPattern())) {
-            String pattern = getConfig().getJobTrackingURLPattern();
-            try {
-                String newTrackingURL = String.format(Locale.ROOT, pattern, info.get(YARN_APP_ID));
-                info.put(YARN_APP_URL, newTrackingURL);
-            } catch (IllegalFormatException ife) {
-                logger.error("Illegal tracking url pattern: {}", getConfig().getJobTrackingURLPattern());
-            }
-        }
-
-        return info;
-    }
+//    public Map<String, String> makeExtraInfo(Map<String, String> info) {
+//        if (info == null) {
+//            return Maps.newHashMap();
+//        }
+//
+//        // post process
+//        if (info.containsKey(MR_JOB_ID) && !info.containsKey(YARN_APP_ID)) {
+//            String jobId = info.get(MR_JOB_ID);
+//            if (jobId.startsWith("job_")) {
+//                info.put(YARN_APP_ID, jobId.replace("job_", "application_"));
+//            }
+//        }
+//
+//        if (info.containsKey(YARN_APP_ID)
+//                && !org.apache.commons.lang3.StringUtils.isEmpty(getConfig().getJobTrackingURLPattern())) {
+//            String pattern = getConfig().getJobTrackingURLPattern();
+//            try {
+//                String newTrackingURL = String.format(Locale.ROOT, pattern, info.get(YARN_APP_ID));
+//                info.put(YARN_APP_URL, newTrackingURL);
+//            } catch (IllegalFormatException ife) {
+//                logger.error("Illegal tracking url pattern: {}", getConfig().getJobTrackingURLPattern());
+//            }
+//        }
+//
+//        return info;
+//    }
 
     public static long getStartTime(Output output) {
         return output.getStartTime();
@@ -939,4 +954,93 @@ public abstract class AbstractExecutable implements Executable {
         }
         return getParentId();
     }
+
+    protected void updateJobOutputWithPersistCheck(String project, String jobId, String output, String logPath)
+            throws ExecuteException, PersistentException, InterruptedException {
+        Throwable exception;
+        int retryCnt = 0;
+        do {
+            exception = null;
+            retryCnt++;
+            try {
+                updateJobOutputToHDFS(project, jobId, output, logPath);
+            } catch (Exception e) {
+                logger.error("update Job Output failed due to : {}", e.getMessage());
+                if (isMetaDataPersistException(e, 5)) {
+                    exception = e;
+                    Thread.sleep(1000L * (long) Math.pow(4, retryCnt));
+                } else {
+                    throw e;
+                }
+            }
+        } while (exception != null && retryCnt <= context.getConfig().getJobMetadataPersistRetry());
+
+        if (exception != null) {
+            checkMetadataPersistConfig(exception);
+        }
+    }
+
+    protected void checkMetadataPersistConfig(Throwable exception) throws ExecuteException {
+        String state = checkStateIfOverride(NonCustomProjectLevelConfig.NOTIFICATION_ON_METADATA_PERSIST.getValue());
+        if((state == null && this.getConfig().getJobMetadataPersistNotificationEnabled())
+                || (Boolean.parseBoolean(state))) { //if override then check override prop
+            handleMetadataPersistException(exception);
+            throw new ExecuteException(exception);
+        }
+    }
+
+    protected void handleMetadataPersistException(Throwable exception) {
+
+        List<String> notifyUsers = getOverrideNotifyUsers();
+        if (notifyUsers == null || notifyUsers.isEmpty()) {
+            logger.warn("no need to send email, user list is empty.");
+            return;
+        }
+        if (this instanceof DefaultExecutable) {
+            MailHelper.notifyUser(getConfig(), EmailNotificationContent.createMetadataPersistExceptionContent(
+                    exception, this), notifyUsers);
+        } else {
+            MailHelper.notifyUser(getConfig(), EmailNotificationContent.createMetadataPersistExceptionContent(
+                    exception, this.getParent()), notifyUsers);
+        }
+    }
+
+    protected boolean isMetaDataPersistException(Exception e, final int maxDepth) {
+        if (e instanceof PersistentException) {
+            return true;
+        }
+        Throwable t = e.getCause();
+        int depth = 0;
+        while (t != null && depth < maxDepth) {
+            depth++;
+            if (t instanceof PersistentException) {
+                return true;
+            }
+            t = t.getCause();
+        }
+        return false;
+    }
+
+    private String checkStateIfOverride(String state) {
+        String overrideState;
+        if (this instanceof DefaultExecutable) {
+            overrideState = EmailNotificationContent.checkOverrideConfig(this.getProject(),
+                    state);
+        } else {
+            overrideState = EmailNotificationContent.checkOverrideConfig(this.getParent().getProject(),
+                    state);
+        }
+
+        return overrideState;
+    }
+
+    private List<String> getOverrideNotifyUsers() {
+        String overrideNotifyUsers = checkStateIfOverride(
+                NonCustomProjectLevelConfig.NOTIFICATION_USER_EMAILS.getValue());
+        List<String> notifyUsers = getAllNotifyUsers(getConfig());
+        if(overrideNotifyUsers != null) {
+            notifyUsers.addAll(Arrays.asList(StringUtils.split(overrideNotifyUsers, ",")));
+        }
+        return notifyUsers;
+    }
 }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
index 6cbeefee56..3f22a1c502 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java
@@ -35,10 +35,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.common.scheduler.JobFinishedNotifier;
-import org.apache.kylin.job.constant.JobIssueEnum;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.ExecuteRuntimeException;
 import org.apache.kylin.job.exception.JobStoppedException;
+import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 
 import io.kyligence.kap.guava20.shaded.common.collect.Lists;
@@ -220,68 +220,22 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
 
     @Override
     protected void onExecuteFinished(ExecuteResult result) throws JobStoppedException {
-        List<? extends Executable> jobs = getTasks();
-        boolean allSucceed = true;
-        boolean hasError = false;
-        boolean hasDiscarded = false;
-        boolean hasSuicidal = false;
-        boolean hasPaused = false;
-        for (Executable task : jobs) {
-            logger.info("Sub-task finished {}, state: {}", task.getDisplayName(), task.getStatus());
-            boolean taskSucceed = false;
-            switch (task.getStatus()) {
-            case RUNNING:
-                hasError = true;
-                break;
-            case ERROR:
-                hasError = true;
-                break;
-            case DISCARDED:
-                hasDiscarded = true;
-                break;
-            case SUICIDAL:
-                hasSuicidal = true;
-                break;
-            case PAUSED:
-                hasPaused = true;
-                break;
-            case SKIP:
-            case SUCCEED:
-                taskSucceed = true;
-                break;
-            default:
-                break;
-            }
-            allSucceed &= taskSucceed;
-        }
-
-        ExecutableState state;
-        if (allSucceed) {
-            state = ExecutableState.SUCCEED;
-        } else if (hasDiscarded) {
-            state = ExecutableState.DISCARDED;
-        } else if (hasSuicidal) {
-            state = ExecutableState.SUICIDAL;
-        } else if (hasError) {
-            state = ExecutableState.ERROR;
-        } else if (hasPaused) {
-            state = ExecutableState.PAUSED;
-        } else {
-            state = ExecutableState.READY;
-        }
-
+        ExecutableState state = checkState();
         logger.info("Job finished {}, state:{}", this.getDisplayName(), state);
 
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
             switch (state) {
             case SUCCEED:
                 updateToFinalState(ExecutableState.SUCCEED, this::afterUpdateOutput, result.getShortErrMsg());
+                onStatusChange(ExecutableState.SUCCEED);
                 break;
             case DISCARDED:
                 updateToFinalState(ExecutableState.DISCARDED, this::onExecuteDiscardHook, result.getShortErrMsg());
+                onStatusChange(ExecutableState.DISCARDED);
                 break;
             case SUICIDAL:
                 updateToFinalState(ExecutableState.SUICIDAL, this::onExecuteSuicidalHook, result.getShortErrMsg());
+                onStatusChange(ExecutableState.SUICIDAL);
                 break;
             case ERROR:
             case PAUSED:
@@ -297,13 +251,15 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
                 String shortErrMsg = null;
                 if (state == ExecutableState.ERROR) {
                     logger.warn("[UNEXPECTED_THINGS_HAPPENED] Unexpected ERROR state discovered here!!!");
-                    notifyUserJobIssue(JobIssueEnum.JOB_ERROR);
                     info = result.getExtraInfo();
                     output = result.getErrorMsg();
                     hook = this::onExecuteErrorHook;
                     shortErrMsg = result.getShortErrMsg();
                 }
                 updateJobOutput(getProject(), getId(), state, info, output, shortErrMsg, hook);
+                if (state == ExecutableState.ERROR) {
+                    onStatusChange(ExecutableState.ERROR);
+                }
                 break;
             default:
                 throw new IllegalArgumentException("Illegal state when job finished: " + state);
@@ -320,6 +276,60 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
                         result.succeed(), getJobStartTime(), getJobEndTime(), getTag(), result.getThrowable()));
     }
 
+    private ExecutableState checkState() {
+        List<? extends Executable> jobs = getTasks();
+        boolean allSucceed = true;
+        boolean hasError = false;
+        boolean hasDiscarded = false;
+        boolean hasSuicidal = false;
+        boolean hasPaused = false;
+        for (Executable task : jobs) {
+            logger.info("Sub-task finished {}, state: {}", task.getDisplayName(), task.getStatus());
+            boolean taskSucceed = false;
+            switch (task.getStatus()) {
+                case RUNNING:
+                    hasError = true;
+                    break;
+                case ERROR:
+                    hasError = true;
+                    break;
+                case DISCARDED:
+                    hasDiscarded = true;
+                    break;
+                case SUICIDAL:
+                    hasSuicidal = true;
+                    break;
+                case PAUSED:
+                    hasPaused = true;
+                    break;
+                case SKIP:
+                case SUCCEED:
+                    taskSucceed = true;
+                    break;
+                default:
+                    break;
+            }
+            allSucceed &= taskSucceed;
+        }
+
+        ExecutableState state;
+        if (allSucceed) {
+            state = ExecutableState.SUCCEED;
+        } else if (hasDiscarded) {
+            state = ExecutableState.DISCARDED;
+        } else if (hasSuicidal) {
+            state = ExecutableState.SUICIDAL;
+        } else if (hasError) {
+            state = ExecutableState.ERROR;
+        } else if (hasPaused) {
+            state = ExecutableState.PAUSED;
+        } else {
+            state = ExecutableState.READY;
+        }
+
+        return state;
+    }
+
     private long getJobStartTime() {
         return subTasks.stream().map(AbstractExecutable::getStartTime).filter(t -> t != 0)
                 .min(Comparator.comparingLong(t -> t)).orElse(0L);
@@ -355,7 +365,8 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
         // Hook method, default action is doing nothing
     }
 
-    private void updateToFinalState(ExecutableState finalState, Consumer<String> hook, String failedMsg) {
+    private void updateToFinalState(ExecutableState finalState, Consumer<String> hook, String failedMsg)
+            throws PersistentException, ExecuteException, InterruptedException {
         //to final state, regardless of isStoppedNonVoluntarily, otherwise a paused job might fail to suicide
         if (!getOutput().getState().isFinalState()) {
             updateJobOutput(getProject(), getId(), finalState, null, null, failedMsg, hook);
@@ -397,4 +408,7 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec
         // just implement it
     }
 
+    protected void onStatusChange(ExecutableState state) {
+        super.notifyUserStatusChange(state);
+    }
 }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/EmailNotificationContent.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/EmailNotificationContent.java
index f40f5a45ef..5106cc16cc 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/EmailNotificationContent.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/EmailNotificationContent.java
@@ -18,52 +18,177 @@
 
 package org.apache.kylin.job.execution;
 
-import java.time.Clock;
-import java.time.LocalDate;
-import java.util.Locale;
-
-import org.apache.kylin.common.util.BasicEmailNotificationContent;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.constant.NonCustomProjectLevelConfig;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobIssueEnum;
+import org.apache.kylin.job.util.MailNotificationUtil;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+
+public class EmailNotificationContent {
+
+    private static final Logger logger = LoggerFactory.getLogger(EmailNotificationContent.class);
+
+    private EmailNotificationContent(){}
+
+    public static Pair<String, String> createContent(JobIssueEnum jobIssue, AbstractExecutable executable) {
+        if (!checkState(jobIssue)) {
+            logger.info("issue state: {} not need to notify users", jobIssue);
+            return null;
+        }
+        logger.info("notify on jobIssue change : {}", jobIssue);
+        Map<String, Object> dataMap = getDataMap(executable);
+        if (JobIssueEnum.SOURCE_RECORDS_CHANGE.equals(jobIssue)) {
+            dataMap.put("start_time", DateFormat.formatToDateStr(executable.getStartTime(),
+                    DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS));
+            dataMap.put("end_time", DateFormat.formatToDateStr(executable.getEndTime(),
+                    DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS));
+        }
+        return Pair.newPair(getMailTitle(jobIssue, executable), getMailContent(jobIssue, dataMap));
+    }
+
+    public static Pair<String, String> createContent(ExecutableState state, AbstractExecutable executable,
+                                                     List<AbstractExecutable> tasks) {
+        final Output output = executable.getManager().getOutput(executable.getId());
+        if (!state.isFinalState() && state != ExecutableState.ERROR) {
+            logger.info("state: {} is not right,not need to notify users", state);
+            return null;
+        }
+        logger.info("notify on execute change state: {}", state);
+        String states = checkOverrideConfig(executable.getProject(),
+                NonCustomProjectLevelConfig.JOB_NOTIFICATION_ENABLED_STATES.getValue());
+        String[] notificationStates = states == null ? executable.getConfig().getJobNotificationStates()
+                : StringUtils.split(states, ",");
+
+        if(notificationStates.length < 1 || !Arrays.asList(notificationStates).contains(state.toStringState())) {
+            logger.info("state: {} is not set,not need to notify users", state);
+            return null;
+        }
+
+        Map<String, Object> dataMap = getDataMap(executable);
+        dataMap.put("source_byte_size", String.valueOf(executable.getByteSize()));
+        dataMap.put("start_time", new Date(executable.getStartTime()).toString());
+        dataMap.put("duration", executable.getDuration() / 60000 + "mins");
+        dataMap.put("last_update_time", new Date(executable.getLastModified()).toString());
+
+        if (state == ExecutableState.ERROR) {
+            checkErrorTask(executable, dataMap, tasks);
+            dataMap.put("error_log",
+                    Matcher.quoteReplacement(StringUtil.noBlank(output.getShortErrMsg(), "no error message")));
+        }
+
+        return Pair.newPair(getMailTitle(state, executable), getMailContent(state, dataMap));
+    }
+
+    private static void checkErrorTask(AbstractExecutable executable, Map<String, Object> dataMap,
+                                       List<AbstractExecutable> tasks) {
+        AbstractExecutable errorTask = null;
+        Output errorOutput = null;
+        for (AbstractExecutable task : tasks) {
+            errorOutput = executable.getManager().getOutput(task.getId());
+            if (errorOutput.getState() == ExecutableState.ERROR) {
+                errorTask = task;
+                break;
+            }
+        }
 
-import lombok.Getter;
-import lombok.Setter;
-
-@Getter
-@Setter
-public class EmailNotificationContent extends BasicEmailNotificationContent {
-
-    private AbstractExecutable executable;
-
-    public static EmailNotificationContent createContent(JobIssueEnum issue, AbstractExecutable executable) {
-        EmailNotificationContent content = new EmailNotificationContent();
-        content.setIssue(issue.getDisplayName());
-        content.setTime(LocalDate.now(Clock.systemDefaultZone()).toString());
-        content.setJobType(executable.getJobType().toString());
-        content.setProject(executable.getProject());
-        content.setExecutable(executable);
-        switch (issue) {
-        case JOB_ERROR:
-            content.setConclusion(CONCLUSION_FOR_JOB_ERROR);
-            content.setSolution(SOLUTION_FOR_JOB_ERROR);
-            break;
-        case LOAD_EMPTY_DATA:
-            content.setConclusion(CONCLUSION_FOR_LOAD_EMPTY_DATA);
-            content.setSolution(
-                    SOLUTION_FOR_LOAD_EMPTY_DATA.replaceAll("\\$\\{model_name\\}", executable.getTargetModelAlias()));
-            break;
-        case SOURCE_RECORDS_CHANGE:
-            content.setConclusion(CONCLUSION_FOR_SOURCE_RECORDS_CHANGE);
-            content.setSolution(SOLUTION_FOR_SOURCE_RECORDS_CHANGE
-                    .replaceAll("\\$\\{start_time\\}",
-                            DateFormat.formatToDateStr(executable.getDataRangeStart(),
-                                    DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS))
-                    .replaceAll("\\$\\{end_time\\}", DateFormat.formatToDateStr(executable.getDataRangeEnd(),
-                            DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS)));
-            break;
-        default:
-            throw new IllegalArgumentException(String.format(Locale.ROOT, "no process for jobIssue: %s.", issue));
+        if (errorTask == null) {
+            logger.info("None of the sub tasks of cubing job {} is error,"
+                    + " and this job should become success or in unit test env", executable.getId());
+            dataMap.put("error_step", MailNotificationUtil.NA);
+            dataMap.put(ExecutableConstants.MR_JOB_ID, MailNotificationUtil.NA);
+        } else {
+            dataMap.put("error_step", errorTask.getName());
+            if (errorTask.getOutput().getExtra().containsKey(ExecutableConstants.MR_JOB_ID)) {
+                final String mrJobId = errorOutput.getExtra().get(ExecutableConstants.MR_JOB_ID);
+                dataMap.put(ExecutableConstants.MR_JOB_ID, StringUtil.noBlank(mrJobId, "Not initialized"));
+            } else {
+                dataMap.put(ExecutableConstants.MR_JOB_ID, MailNotificationUtil.NA);
+            }
         }
-        return content;
     }
+
+    public static Pair<String, String> createMetadataPersistExceptionContent(Throwable exception,
+                                                                             AbstractExecutable executable) {
+        logger.info("notify on metadata persist exception: {}", exception.getMessage());
+        Map<String, Object> dataMap = getDataMap(executable);
+        dataMap.put("error_log", Matcher.quoteReplacement(StringUtil.noBlank(
+                exception.getMessage(), "no error message")));
+
+        String content = MailNotificationUtil.getMailContent(MailNotificationUtil.METADATA_PERSIST_FAIL, dataMap);
+        String title = MailNotificationUtil.getMailTitle("METADATA_PERSIST",
+                "FAIL",
+                StringUtil.noBlank(executable.getConfig().getDeployEnv(), MailNotificationUtil.NA),
+                executable.getProject(),
+                StringUtil.noBlank(executable.getTargetSubjectAlias(), MailNotificationUtil.NA));
+        return Pair.newPair(title, content);
+    }
+
+    private static Map<String, Object> getDataMap(AbstractExecutable executable) {
+        Map<String, Object> dataMap = Maps.newHashMap();
+        dataMap.put("job_name", StringUtil.noBlank(executable.getName(), "missing job_name"));
+        dataMap.put("env_name", executable.getConfig().getDeployEnv());
+        dataMap.put("submitter", StringUtil.noBlank(executable.getSubmitter(), "missing submitter"));
+        dataMap.put("job_engine", MailNotificationUtil.getLocalHostName());
+        dataMap.put("project_name", executable.getProject());
+        dataMap.put("model_name", StringUtil.noBlank(executable.getTargetModelAlias(), "missing model_name"));
+        return dataMap;
+    }
+
+    private static boolean checkState(JobIssueEnum jobIssue) {
+        return JobIssueEnum.LOAD_EMPTY_DATA.equals(jobIssue)
+                || JobIssueEnum.SOURCE_RECORDS_CHANGE.equals(jobIssue);
+    }
+
+    private static String getMailContent(ExecutableState state, Map<String, Object> dataMap) {
+        return MailNotificationUtil.getMailContent(state, dataMap);
+    }
+
+    private static String getMailContent(JobIssueEnum jobIssueEnum, Map<String, Object> dataMap) {
+        return MailNotificationUtil.getMailContent(jobIssueEnum, dataMap);
+    }
+
+
+    private static String getMailTitle(ExecutableState state, AbstractExecutable executable) {
+        String targetSubjectAlias = executable.getTargetSubjectAlias();
+        return MailNotificationUtil.getMailTitle("JOB",
+                state.toString(),
+                executable.getConfig().getMetadataUrlPrefix(),
+                executable.getConfig().getDeployEnv(),
+                executable.getProject(),
+                targetSubjectAlias == null ? "" : targetSubjectAlias);
+    }
+
+    private static String getMailTitle(JobIssueEnum issue, AbstractExecutable executable) {
+        String targetSubjectAlias = executable.getTargetSubjectAlias();
+        return MailNotificationUtil.getMailTitle("JOB",
+                issue.getDisplayName(),
+                executable.getConfig().getMetadataUrlPrefix(),
+                executable.getConfig().getDeployEnv(),
+                executable.getProject(),
+                targetSubjectAlias == null ? "" : targetSubjectAlias);
+    }
+
+    public static String checkOverrideConfig(String project, String overrideNotificationName) {
+        NProjectManager projectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+        ProjectInstance projectInstance = projectManager.getProject(project);
+        return projectInstance.getOverrideKylinProps().get(overrideNotificationName);
+    }
+
+
 }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
index b10bb311b1..9016ee8a40 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
@@ -119,4 +119,17 @@ public enum ExecutableState {
             throw new RuntimeException("invalid state:" + this);
         }
     }
+
+    public String toStringState() {
+        switch (this) {
+            case SUCCEED:
+                return "Succeed";
+            case ERROR:
+                return "Error";
+            case DISCARDED:
+                return "Discard";
+            default:
+                throw new IllegalStateException("invalid Executable state:" + this);
+        }
+    }
 }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index a161751468..70b4fc1b84 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -79,6 +79,7 @@ import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.dao.NExecutableDao;
+import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
@@ -1507,7 +1508,7 @@ public class NExecutableManager {
                 || to == ExecutableState.ERROR || to == ExecutableState.SUICIDAL;
     }
 
-    public void updateJobOutputToHDFS(String resPath, ExecutableOutputPO obj) {
+    public void updateJobOutputToHDFS(String resPath, ExecutableOutputPO obj) throws PersistentException {
         DataOutputStream dout = null;
         try {
             Path path = new Path(resPath);
@@ -1516,7 +1517,7 @@ public class NExecutableManager {
             JsonUtil.writeValue(dout, obj);
         } catch (Exception e) {
             // the operation to update output to hdfs failed, next task should not be interrupted.
-            logger.error("update job output [{}] to HDFS failed.", resPath, e);
+            throw new PersistentException("update job output: " + resPath + " to HDFS failed", e);
         } finally {
             IOUtils.closeQuietly(dout);
         }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/util/MailNotificationUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/util/MailNotificationUtil.java
new file mode 100644
index 0000000000..2fd86d0aaa
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/util/MailNotificationUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.util;
+
+import com.google.common.base.Joiner;
+import org.apache.kylin.common.util.MailTemplateProvider;
+import org.apache.kylin.job.constant.JobIssueEnum;
+import org.apache.kylin.job.execution.ExecutableState;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+public class MailNotificationUtil {
+    public static final String JOB_ERROR = "JOB_ERROR";
+    public static final String JOB_DISCARD = "JOB_DISCARD";
+    public static final String JOB_SUCCEED = "JOB_SUCCEED";
+    public static final String JOB_SUICIDAL = "JOB_SUICIDAL";
+    public static final String JOB_LOAD_EMPTY_DATA = "LOAD_EMPTY_DATA";
+    public static final String JOB_SOURCE_RECORDS_CHANGE = "SOURCE_RECORDS_CHANGE";
+    public static final String METADATA_PERSIST_FAIL = "METADATA_PERSIST_FAIL";
+
+    public static final String NA = "NA";
+
+    private static String localHostName;
+
+    static {
+        try {
+            localHostName = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            localHostName = "UNKNOWN";
+        }
+    }
+
+    private MailNotificationUtil() {
+        throw new IllegalStateException("Class MailNotificationUtil is an utility class !");
+    }
+
+    private static String getMailTemplateKey(ExecutableState state) {
+        switch (state) {
+            case ERROR:
+                return JOB_ERROR;
+            case DISCARDED:
+                return JOB_DISCARD;
+            case SUCCEED:
+                return JOB_SUCCEED;
+            case SUICIDAL:
+                return JOB_SUICIDAL;
+            default:
+                return null;
+        }
+    }
+
+    private static String getMailTemplateKey(JobIssueEnum jobIssue) {
+        switch (jobIssue) {
+            case LOAD_EMPTY_DATA:
+                return JOB_LOAD_EMPTY_DATA;
+            case SOURCE_RECORDS_CHANGE:
+                return JOB_SOURCE_RECORDS_CHANGE;
+            default:
+                return null;
+        }
+    }
+
+
+    public static String getLocalHostName() {
+        return localHostName;
+    }
+
+    public static String getMailContent(ExecutableState state, Map<String, Object> dataMap) {
+        return MailTemplateProvider.getInstance().buildMailContent(MailNotificationUtil.getMailTemplateKey(state),
+                dataMap);
+    }
+
+    public static String getMailContent(JobIssueEnum jobIssue, Map<String, Object> dataMap) {
+        return MailTemplateProvider.getInstance().buildMailContent(MailNotificationUtil.getMailTemplateKey(jobIssue),
+                dataMap);
+    }
+
+    public static String getMailContent(String key, Map<String, Object> dataMap) {
+        return MailTemplateProvider.getInstance().buildMailContent(key, dataMap);
+    }
+
+    public static String getMailTitle(String... titleParts) {
+        return "[" + Joiner.on("]-[").join(titleParts) + "]";
+    }
+
+}
diff --git a/src/core-job/src/main/resources/mail_templates/JOB_DISCARD.ftl b/src/core-job/src/main/resources/mail_templates/JOB_DISCARD.ftl
new file mode 100644
index 0000000000..1fe1dff080
--- /dev/null
+++ b/src/core-job/src/main/resources/mail_templates/JOB_DISCARD.ftl
@@ -0,0 +1,274 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+    <meta http-equiv="Content-Type" content="Multipart/Alternative; charset=UTF-8"/>
+    <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
+</head>
+
+<style>
+    html {
+        font-size: 10px;
+    }
+
+    * {
+        box-sizing: border-box;
+    }
+
+    a:hover,
+    a:focus {
+        color: #23527c;
+        text-decoration: underline;
+    }
+
+    a:focus {
+        outline: 5px auto -webkit-focus-ring-color;
+        outline-offset: -2px;
+    }
+</style>
+
+<body>
+<div style="margin-left:5%;margin-right:5%;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+<span style="
+    line-height: 1.1;font-size: 18px;">
+    <p style="text-align:left;">Dear Kylin5 user,</p>
+    <p>It's a pity that the job is discarded. Thank you for using Kylin.</p>
+</span>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h1>
+    <span style="display: inline;
+            background-color: #607D8B;
+            color: #fff;
+            line-height: 1;
+            font-weight: 700;
+            font-size:36px;
+            text-align: center;">&nbsp;Discarded&nbsp;</span>
+    </h1>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <table cellpadding="0" cellspacing="0" width="100%" style="border-collapse: collapse;border:1px solid #f8f8f8">
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #eeeeee;
+                    border:1px solid #f8f8f8;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #404040;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${job_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #eeeeee;
+                    border:1px solid #f8f8f8;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #404040;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${env_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Submitter
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${submitter}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Job Engine
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${job_engine}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Project
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${project_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Model Name
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${model_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Source Byte Size
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${source_byte_size}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Start Time
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${start_time}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Duration
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${duration}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Last Update Time
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${last_update_time}
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+    </table>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h4 style="font-weight: 500;
+    line-height: 1.1;font-size:18px;">
+        <p>Best Wishes!</p>
+        <p style="margin: 0 0 10px;"><b>Kylin Team</b></p>
+    </h4>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/src/core-job/src/main/resources/mail_templates/JOB_ERROR.ftl b/src/core-job/src/main/resources/mail_templates/JOB_ERROR.ftl
new file mode 100644
index 0000000000..65733d12ec
--- /dev/null
+++ b/src/core-job/src/main/resources/mail_templates/JOB_ERROR.ftl
@@ -0,0 +1,390 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+    <meta http-equiv="Content-Type" content="Multipart/Alternative; charset=UTF-8"/>
+    <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
+</head>
+
+<style>
+    html {
+        font-size: 10px;
+    }
+
+    * {
+        box-sizing: border-box;
+    }
+
+    a:hover,
+    a:focus {
+        color: #23527c;
+        text-decoration: underline;
+    }
+
+    a:focus {
+        outline: 5px auto -webkit-focus-ring-color;
+        outline-offset: -2px;
+    }
+</style>
+
+<body>
+<div style="margin-left:5%;margin-right:5%;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+<span style="
+    line-height: 1.1;font-size: 18px;">
+    <p style="text-align:left;">Dear Kylin5 user,</p>
+    <p>This job of building model <strong>failed</strong>.</p>
+    <p>Please check the error log and try again, or you can contact the kylin team.</p>
+    <p>Thank you for using Kylin and we apologize for the inconvenience.</p>
+</span>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h1>
+    <span style="display: inline;
+            background-color: #d9534f;
+            color: #fff;
+            line-height: 1;
+            font-weight: 700;
+            font-size:36px;
+            text-align: center;">&nbsp;Error&nbsp;</span>
+    </h1>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+
+    <table cellpadding="0" cellspacing="0" width="100%" style="border-collapse: collapse;border:1px solid #ebccd1;">
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #f2dede;
+                    border:1px solid #ebccd1;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #a94442;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${job_name}
+                </h4>
+            </td>
+        </tr>
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #f2dede;
+                    border:1px solid #ebccd1;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #a94442;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${env_name}
+                </h4>
+            </td>
+        </tr>
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Submitter
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${submitter}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Job Engine
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${job_engine}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Project
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${project_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Model Name
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${model_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Source Byte Size
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${source_byte_size}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Start Time
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${start_time}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Duration
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${duration}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">MR Waiting Time
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        NA
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Last Update Time
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${last_update_time}
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #f2dede;
+                    border:1px solid #ebccd1;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #a94442;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    Job Error Details
+                </h4>
+            </td>
+        </tr>
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Error Step
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${error_step}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            MR Job
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${mr_job_id}
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #f2dede;
+                    border:1px solid #ebccd1;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #a94442;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    Logs
+                </h4>
+            </td>
+        </tr>
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+
+                            <pre style="white-space: pre-wrap;">${error_log}</pre>
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+    </table>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h4 style="font-weight: 500;
+    line-height: 1.1;font-size:18px;">
+        <p>Best Wishes!</p>
+        <p style="margin: 0 0 10px;"><b>Kylin Team</b></p>
+    </h4>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/src/core-job/src/main/resources/mail_templates/JOB_SUCCEED.ftl b/src/core-job/src/main/resources/mail_templates/JOB_SUCCEED.ftl
new file mode 100644
index 0000000000..f793c3e803
--- /dev/null
+++ b/src/core-job/src/main/resources/mail_templates/JOB_SUCCEED.ftl
@@ -0,0 +1,273 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+    <meta http-equiv="Content-Type" content="Multipart/Alternative; charset=UTF-8"/>
+    <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
+</head>
+
+<style>
+    html {
+        font-size: 10px;
+    }
+
+    * {
+        box-sizing: border-box;
+    }
+
+    a:hover,
+    a:focus {
+        color: #23527c;
+        text-decoration: underline;
+    }
+
+    a:focus {
+        outline: 5px auto -webkit-focus-ring-color;
+        outline-offset: -2px;
+    }
+</style>
+
+<body>
+<div style="margin-left:5%;margin-right:5%;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+<span style="line-height: 1.1;font-size: 18px;">
+    <p style="text-align:left;">Dear Kylin5 user,</p>
+    <p>Congratulations! Please feel free to query based on kylin model. Thank you for using Kylin.</p>
+</span>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <h1>
+    <span style="display: inline;
+            background-color: #5cb85c;
+            color: #fff;
+            line-height: 1;
+            font-weight: 700;
+            font-size:36px;
+            text-align: center;">&nbsp;Succeed&nbsp;</span>
+    </h1>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <table cellpadding="0" cellspacing="0" width="100%" style="border-collapse: collapse;border:1px solid #d6e9c6;">
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #dff0d8;
+                    border:1px solid #d6e9c6;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #3c763d;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${job_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #dff0d8;
+                    border:1px solid #d6e9c6;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #3c763d;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${env_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Submitter
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${submitter}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Job Engine
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${job_engine}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Project
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${project_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Model Name
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${model_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Source Byte Size
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${source_byte_size}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Start Time
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${start_time}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Duration
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${duration}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">
+                            Last Update Time
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${last_update_time}
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+    </table>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <h4 style="font-weight: 500;
+            line-height: 1.1;font-size:18px;">
+        <p>Best Wishes!</p>
+        <p style="margin: 0 0 10px;"><b>Kylin Team</b></p>
+    </h4>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/src/core-job/src/main/resources/mail_templates/LOAD_EMPTY_DATA.ftl b/src/core-job/src/main/resources/mail_templates/LOAD_EMPTY_DATA.ftl
new file mode 100644
index 0000000000..41400ee2de
--- /dev/null
+++ b/src/core-job/src/main/resources/mail_templates/LOAD_EMPTY_DATA.ftl
@@ -0,0 +1,200 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+    <meta http-equiv="Content-Type" content="Multipart/Alternative; charset=UTF-8"/>
+    <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
+</head>
+
+<style>
+    html {
+        font-size: 10px;
+    }
+
+    * {
+        box-sizing: border-box;
+    }
+
+    a:hover,
+    a:focus {
+        color: #23527c;
+        text-decoration: underline;
+    }
+
+    a:focus {
+        outline: 5px auto -webkit-focus-ring-color;
+        outline-offset: -2px;
+    }
+</style>
+
+<body>
+<div style="margin-left:5%;margin-right:5%;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+<span style="
+    line-height: 1.1;font-size: 18px;">
+    <p style="text-align:left;">Dear Kylin5 user,</p>
+    <p>We found a job has loaded empty data in your Kylin system as below.</p>
+    <p>It won't affect your system stability, and you may reload data by following instructions.</p>
+</span>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h1>
+    <span style="display: inline;
+            background-color: #607D8B;
+            color: #fff;
+            line-height: 1;
+            font-weight: 700;
+            font-size:36px;
+            text-align: center;">&nbsp;LOADED EMPTY DATA&nbsp;</span>
+    </h1>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <table cellpadding="0" cellspacing="0" width="100%" style="border-collapse: collapse;border:1px solid #f8f8f8">
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #eeeeee;
+                    border:1px solid #f8f8f8;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #404040;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${job_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #eeeeee;
+                    border:1px solid #f8f8f8;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #404040;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${env_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Submitter
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${submitter}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Job Engine
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${job_engine}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Project
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${project_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Model Name
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${model_name}
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+    </table>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h4 style="font-weight: 500;
+    line-height: 1.1;font-size:18px;">
+        <p>Best Wishes!</p>
+        <p style="margin: 0 0 10px;"><b>Kylin Team</b></p>
+    </h4>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/src/core-job/src/main/resources/mail_templates/METADATA_PERSIST_FAIL.ftl b/src/core-job/src/main/resources/mail_templates/METADATA_PERSIST_FAIL.ftl
new file mode 100644
index 0000000000..b18e34e273
--- /dev/null
+++ b/src/core-job/src/main/resources/mail_templates/METADATA_PERSIST_FAIL.ftl
@@ -0,0 +1,232 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+    <meta http-equiv="Content-Type" content="Multipart/Alternative; charset=UTF-8"/>
+    <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
+</head>
+
+<style>
+    html {
+        font-size: 10px;
+    }
+
+    * {
+        box-sizing: border-box;
+    }
+
+    a:hover,
+    a:focus {
+        color: #23527c;
+        text-decoration: underline;
+    }
+
+    a:focus {
+        outline: 5px auto -webkit-focus-ring-color;
+        outline-offset: -2px;
+    }
+</style>
+
+<body>
+<div style="margin-left:5%;margin-right:5%;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+        <span style="line-height: 1.1;font-size: 18px;">
+    <p style="text-align:left;">Dear Kylin5 user,</p>
+    <p>Kylin fails to update the job output due to some hdfs issue. Need to ask Hadoop Service Team for help as soon as possible.</p>
+</span>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <h1>
+            <span style="display: inline;
+                    background-color: #d9534f;
+                    color: #fff;
+                    line-height: 1;
+                    font-weight: 700;
+                    font-size:36px;
+                    text-align: center;">&nbsp;Metadata Persist Error&nbsp;</span>
+    </h1>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <table cellpadding="0" cellspacing="0" width="100%" style="border-collapse: collapse;border:1px solid #ebccd1;">
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #f2dede;
+                    border:1px solid #ebccd1;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #a94442;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${job_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #f2dede;
+                    border:1px solid #ebccd1;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #a94442;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${env_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Submitter
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${submitter}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Job Engine
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${job_engine}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Project
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                            ${project_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Model Name
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                            ${model_name}
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #f2dede;
+                    border:1px solid #ebccd1;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #a94442;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    Logs
+                </h4>
+            </td>
+        </tr>
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                            <pre style="white-space: pre-wrap;">${error_log}</pre>
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+    </table>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h4 style="font-weight: 500;
+            line-height: 1.1;font-size:18px;">
+        <p>Best Wishes!</p>
+        <p style="margin: 0 0 10px;"><b>Kylin Team</b></p>
+    </h4>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/src/core-job/src/main/resources/mail_templates/OVER_CAPACITY_THRESHOLD.ftl b/src/core-job/src/main/resources/mail_templates/OVER_CAPACITY_THRESHOLD.ftl
new file mode 100644
index 0000000000..5114f1f3e8
--- /dev/null
+++ b/src/core-job/src/main/resources/mail_templates/OVER_CAPACITY_THRESHOLD.ftl
@@ -0,0 +1,200 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+    <meta http-equiv="Content-Type" content="Multipart/Alternative; charset=UTF-8"/>
+    <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
+</head>
+
+<style>
+    html {
+        font-size: 10px;
+    }
+
+    * {
+        box-sizing: border-box;
+    }
+
+    a:hover,
+    a:focus {
+        color: #23527c;
+        text-decoration: underline;
+    }
+
+    a:focus {
+        outline: 5px auto -webkit-focus-ring-color;
+        outline-offset: -2px;
+    }
+</style>
+
+<body>
+<div style="margin-left:5%;margin-right:5%;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+<span style="
+    line-height: 1.1;font-size: 18px;">
+    <p style="text-align:left;">Dear Kylin5 user,</p>
+    <p>The amount of data volume used has reached threshold of the license’s limit</p>
+    <p>To ensure the availability of your service, please contact Kylin team to get a new license, or try deleting some segments.</p>
+</span>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h1>
+    <span style="display: inline;
+            background-color: #607D8B;
+            color: #fff;
+            line-height: 1;
+            font-weight: 700;
+            font-size:36px;
+            text-align: center;">&nbsp;Over Capacity Threshold&nbsp;</span>
+    </h1>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <table cellpadding="0" cellspacing="0" width="100%" style="border-collapse: collapse;border:1px solid #f8f8f8">
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #eeeeee;
+                    border:1px solid #f8f8f8;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #404040;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    CHECK USAGE
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #eeeeee;
+                    border:1px solid #f8f8f8;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #404040;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    ${env_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Resource Name
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                            ${resource_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Capacity Threshold
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${capacity_threshold}%
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Volume Total
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${volume_total}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Volume Used
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${volume_used}
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+    </table>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h4 style="font-weight: 500;
+    line-height: 1.1;font-size:18px;">
+        <p>Best Wishes!</p>
+        <p style="margin: 0 0 10px;"><b>Kylin Team</b></p>
+    </h4>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/src/core-job/src/main/resources/mail_templates/SOURCE_RECORDS_CHANGE.ftl b/src/core-job/src/main/resources/mail_templates/SOURCE_RECORDS_CHANGE.ftl
new file mode 100644
index 0000000000..40ac287b02
--- /dev/null
+++ b/src/core-job/src/main/resources/mail_templates/SOURCE_RECORDS_CHANGE.ftl
@@ -0,0 +1,205 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+    <meta http-equiv="Content-Type" content="Multipart/Alternative; charset=UTF-8"/>
+    <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
+</head>
+
+<style>
+    html {
+        font-size: 10px;
+    }
+
+    * {
+        box-sizing: border-box;
+    }
+
+    a:hover,
+    a:focus {
+        color: #23527c;
+        text-decoration: underline;
+    }
+
+    a:focus {
+        outline: 5px auto -webkit-focus-ring-color;
+        outline-offset: -2px;
+    }
+</style>
+
+<body>
+<div style="margin-left:5%;margin-right:5%;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+<span style="
+    line-height: 1.1;font-size: 18px;">
+    <p style="text-align:left;">Dear Kylin5 user,</p>
+    <p>We found some source records updated in your Kylin system. You can reload updated records by following instructions.</p>
+    <p>Ignore this issue may cause query result inconsistency over different indexes.</p>
+    <#if start_time == "1970-01-01 08:00:00">
+        <p>You may refresh the full build segment to apply source records change.</p>
+    <#else>
+        <p>You may refresh the segment from ${start_time} to ${end_time} to apply source records change.</p>
+    </#if>
+</span>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h1>
+    <span style="display: inline;
+            background-color: #607D8B;
+            color: #fff;
+            line-height: 1;
+            font-weight: 700;
+            font-size:36px;
+            text-align: center;">&nbsp;SOURCE RECORDS CHANGED&nbsp;</span>
+    </h1>
+    <hr style="margin-top: 20px;
+            margin-bottom: 20px;
+            border: 0;
+            border-top: 1px solid #eee;">
+    <table cellpadding="0" cellspacing="0" width="100%" style="border-collapse: collapse;border:1px solid #f8f8f8">
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #eeeeee;
+                    border:1px solid #f8f8f8;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #404040;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${job_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 10px 15px;
+                    background-color: #eeeeee;
+                    border:1px solid #f8f8f8;">
+                <h4 style="margin-top: 0;
+                        margin-bottom: 0;
+                        font-size: 16px;
+                        color: inherit;
+                        color: #404040;
+                        font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                ${env_name}
+                </h4>
+            </td>
+        </tr>
+
+        <tr>
+
+            <td style="padding: 15px;">
+                <table cellpadding="0" cellspacing="0" width="100%"
+                       style="margin-bottom: 20px;border:1 solid #ddd;border-collapse: collapse;font-family: 'Trebuchet MS ', Arial, Helvetica, sans-serif;">
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Submitter
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${submitter}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Job Engine
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${job_engine}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Project
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${project_name}
+                        </td>
+                    </tr>
+                    <tr>
+                        <th width="30%" style="padding: 8px;
+                                            line-height: 1.42857143;
+                                            vertical-align: top;
+                                            border: 1px solid #ddd;
+                                            text-align: left;
+                                            font-size: medium;
+                                            font-style: normal;">Model Name
+                        </th>
+                        <td style="padding: 8px;
+                                line-height: 1.42857143;
+                                vertical-align: top;
+                                border: 1px solid #ddd;
+                                font-size: medium;
+                                font-style: normal;">
+                        ${model_name}
+                        </td>
+                    </tr>
+                </table>
+            </td>
+        </tr>
+    </table>
+    <hr style="margin-top: 20px;
+    margin-bottom: 20px;
+    border: 0;
+    border-top: 1px solid #eee;">
+    <h4 style="font-weight: 500;
+    line-height: 1.1;font-size:18px;">
+        <p>Best Wishes!</p>
+        <p style="margin: 0 0 10px;"><b>Kylin Team</b></p>
+    </h4>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/ErrorTestExecutable.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/ErrorTestExecutable.java
index b0544d5974..24abfc8637 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/ErrorTestExecutable.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/ErrorTestExecutable.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.PersistentException;
 
 /**
  */
@@ -36,10 +37,11 @@ public class ErrorTestExecutable extends BaseTestExecutable {
     }
 
     @Override
-    public ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException, InterruptedException {
         Map<String, String> info = new HashMap<String, String>() {
             {
                 put("runningStatus", "inRunning");
+                put("mr_job_id", "38spvzt-981h-9831-vus8-9aklushnfbza");
             }
         };
         updateJobOutput(getProject(), getId(), ExecutableState.RUNNING, info, null, null);
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
index 519339262a..8aa9bc85b5 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
@@ -49,12 +49,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.MailHelper;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobIssueEnum;
 import org.apache.kylin.job.dao.NExecutableDao;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
@@ -599,18 +603,37 @@ public class NExecutableManagerTest extends NLocalFileMetadataTestCase {
         job.setParam(NBatchConstants.P_DATA_RANGE_START, SegmentRange.dateToLong(start) + "");
         job.setParam(NBatchConstants.P_DATA_RANGE_END, SegmentRange.dateToLong(end) + "");
         job.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
-        EmailNotificationContent content = EmailNotificationContent.createContent(JobIssueEnum.JOB_ERROR, job);
-        Assert.assertTrue(content.getEmailTitle().contains(JobIssueEnum.JOB_ERROR.getDisplayName()));
-        Assert.assertTrue(!content.getEmailBody().contains("$"));
-        Assert.assertTrue(content.getEmailBody().contains(project));
-        Assert.assertTrue(content.getEmailBody().contains(job.getName()));
-
-        content = EmailNotificationContent.createContent(JobIssueEnum.LOAD_EMPTY_DATA, job);
-        Assert.assertTrue(content.getEmailBody().contains(job.getTargetModelAlias()));
+        Pair<String, String> mail = EmailNotificationContent.createContent(ExecutableState.ERROR, job, job.getTasks());
+        assert mail != null;
+        Assert.assertTrue(mail.getFirst().contains(ExecutableState.ERROR.toString()));
+        Assert.assertTrue(mail.getSecond().contains("Job Error Details"));
+        Assert.assertTrue(mail.getSecond().contains(project));
+        Assert.assertTrue(mail.getSecond().contains(job.getName()));
+
+        mail = EmailNotificationContent.createContent(JobIssueEnum.LOAD_EMPTY_DATA, job);
+        assert mail != null;
+        Assert.assertTrue(mail.getSecond().contains(job.getTargetModelAlias()));
         Assert.assertEquals("89af4ee2-2cdb-4b07-b39e-4c29856309aa", job.getTargetModelId());
-        content = EmailNotificationContent.createContent(JobIssueEnum.SOURCE_RECORDS_CHANGE, job);
-        Assert.assertTrue(content.getEmailBody().contains(start));
-        Assert.assertTrue(content.getEmailBody().contains(end));
+
+        mail = EmailNotificationContent.createContent(JobIssueEnum.SOURCE_RECORDS_CHANGE, job);
+        assert mail != null;
+        Assert.assertTrue(mail.getFirst().contains("Source Records Change"));
+        Assert.assertTrue(mail.getSecond().contains("reload updated records"));
+
+        Throwable exception = new Throwable("metadata persist failed!");
+        mail = EmailNotificationContent.createMetadataPersistExceptionContent(exception, job);
+        Assert.assertTrue(mail.getFirst().contains("METADATA_PERSIST"));
+        Assert.assertTrue(mail.getSecond().contains("Hadoop Service"));
+
+        mail = MailHelper.creatContentForCapacityUsage(1000000L, 10000L, project);
+        Assert.assertTrue(mail.getFirst().contains("OVER_CAPACITY_THRESHOLD"));
+        Assert.assertTrue(mail.getSecond().contains("deleting some segments"));
+
+        mail = EmailNotificationContent.createContent(null, job);
+        Assert.assertNull(mail);
+
+        mail = EmailNotificationContent.createContent(ExecutableState.READY, job, job.getTasks());
+        Assert.assertNull(mail);
 
     }
 
@@ -934,4 +957,60 @@ public class NExecutableManagerTest extends NLocalFileMetadataTestCase {
         manager.updateJobOutput(job.getId(), ExecutableState.RUNNING);
         executable.checkParentJobStatus();
     }
+
+    @Test
+    public void testMetadataPersistConfig() throws ExecuteException, PersistentException {
+        DefaultExecutable job = new DefaultExecutable();
+        job.setProject("default");
+        val executable = new SucceedTestExecutable();
+        executable.setProject("default");
+        job.addTask(executable);
+        manager.addJob(job);
+
+        executable.checkMetadataPersistConfig(null);
+
+        executable.handleMetadataPersistException(new PersistentException("test email"));
+
+        PersistentException persistentException = new PersistentException("test");
+        boolean flag = executable.isMetaDataPersistException(persistentException, 1);
+        Assert.assertTrue(flag);
+        ExecuteException executeException = new ExecuteException("test1", new Throwable());
+        flag = executable.isMetaDataPersistException(executeException, 1);
+        Assert.assertFalse(flag);
+        executable.checkMetadataPersistConfig(persistentException);
+
+        //cover default
+        DefaultExecutable defaultExecutable = new DefaultExecutable();
+        defaultExecutable.setProject("default");
+        job = new DefaultExecutable();
+        job.setProject("default");
+        job.addTask(defaultExecutable);
+        manager.addJob(job);
+
+        defaultExecutable.handleMetadataPersistException(new PersistentException("test email1"));
+
+        persistentException = new PersistentException("test");
+        flag = defaultExecutable.isMetaDataPersistException(persistentException, 1);
+        Assert.assertTrue(flag);
+        executeException = new ExecuteException("test1", new Throwable());
+        flag = defaultExecutable.isMetaDataPersistException(executeException, 1);
+        Assert.assertFalse(flag);
+
+    }
+
+    @Test
+    public void testLoadEmptyData() {
+        NDataLayout dataLayout = new NDataLayout();
+        NDataLayout[] nDataLayouts = {dataLayout};
+        DefaultExecutable job = new DefaultExecutable();
+        job.setProject("default");
+        val executable = new SucceedTestExecutable();
+        executable.setProject("default");
+        job.addTask(executable);
+        manager.addJob(job);
+        executable.notifyUserIfNecessary(nDataLayouts);
+
+    }
+
+
 }
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/SucceedTestExecutable.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/SucceedTestExecutable.java
index 5d49a87de7..adf81e3c0d 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/SucceedTestExecutable.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/SucceedTestExecutable.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.job.execution;
 
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.PersistentException;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,7 +37,7 @@ public class SucceedTestExecutable extends BaseTestExecutable {
     }
 
     @Override
-    public ExecuteResult doWork(ExecutableContext context) {
+    protected ExecuteResult doWork(ExecutableContext context) throws PersistentException, ExecuteException, InterruptedException {
         Map<String, String> info = new HashMap<String, String>() {
             {
                 put("runningStatus", "inRunning");
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
index f6602d2cd1..69dd9f14f8 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
@@ -49,6 +49,7 @@ import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.JobStoppedException;
 import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException;
+import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.BaseTestExecutable;
 import org.apache.kylin.job.execution.DefaultExecutable;
@@ -157,7 +158,7 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest {
     }
 
     @Test
-    public void testGetOutputFromHDFSByJobId() throws IOException {
+    public void testGetOutputFromHDFSByJobId() throws IOException, PersistentException {
         File file = temporaryFolder.newFile("execute_output.json." + System.currentTimeMillis() + ".log");
         for (int i = 0; i < 200; i++) {
             Files.write(file.toPath(), String.format(Locale.ROOT, "lines: %s\n", i).getBytes(Charset.defaultCharset()),
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index cf4136d6d5..ed9f738cd3 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.metadata.project;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +29,7 @@ import java.util.TreeSet;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.constant.NonCustomProjectLevelConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.StringUtil;
@@ -364,4 +366,12 @@ public class ProjectInstance extends RootPersistentEntity implements ISourceAwar
         return ResourceStore.getKylinMetaStore(this.config);
     }
 
+    public List<String> getEmailUsers() {
+        String users = this.getOverrideKylinProps().get(NonCustomProjectLevelConfig.NOTIFICATION_USER_EMAILS.getValue());
+        if(users != null) {
+            return Arrays.asList(StringUtil.split(users, ","));
+        }
+        return new ArrayList<>();
+    }
+
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/sourceusage/SourceUsageManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/sourceusage/SourceUsageManager.java
index c9c3be30fb..882a085f6d 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/sourceusage/SourceUsageManager.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/sourceusage/SourceUsageManager.java
@@ -45,7 +45,7 @@ import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailHelper;
+import org.apache.kylin.common.util.MailTemplateProvider;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -233,8 +233,8 @@ public class SourceUsageManager {
                         logger.info("Capacity usage is less than threshold, enable notification");
                     } else if (copyForWrite.isCapacityNotification() && config.isOverCapacityNotificationEnabled()
                             && isOverCapacityThreshold(copyForWrite)) {
-                        if (MailHelper.notifyUserForOverCapacity(copyForWrite.getLicenseCapacity(),
-                                copyForWrite.getCurrentCapacity())) {
+                        if (MailTemplateProvider.notifyUserForOverCapacity(copyForWrite.getLicenseCapacity(),
+                                copyForWrite.getCurrentCapacity(), usageRecord.resourceName())) {
                             copyForWrite.setCapacityNotification(false);
                             logger.info("Capacity usage is more than threshold, disable notification");
                         } else {
diff --git a/src/examples/test_case_data/localmeta/kylin.properties b/src/examples/test_case_data/localmeta/kylin.properties
index a8e324a447..239cfe4b0c 100755
--- a/src/examples/test_case_data/localmeta/kylin.properties
+++ b/src/examples/test_case_data/localmeta/kylin.properties
@@ -107,14 +107,25 @@ kylin.security.saml.context-server-port=443
 kylin.security.saml.context-path=/kylin
 
 ### MAIL ###
-# If true, will send email notification;
-#kylin.job.notification-enabled=true
+# If true, will send email notification: succeed, error, discard;
+kylin.job.notification-enabled=false
 #kylin.job.notification-mail-enable-starttls=true
-#kylin.job.notification-mail-host=smtp.office365.com
+kylin.job.notification-mail-host=smtp.office365.com
 #kylin.job.notification-mail-port=587
-#kylin.job.notification-mail-username=kylin@example.com
-#kylin.job.notification-mail-password=mypassword
-#kylin.job.notification-mail-sender=kylin@example.com
+kylin.job.notification-mail-username=kylin@example.com
+kylin.job.notification-mail-password=mypassword
+kylin.job.notification-mail-sender=kylin@example.com
+kylin.job.notification-admin-emails=kylin@example.com
+kylin.job.notification-enable-states=Succeed,Error,Discard
+#kylin.capacity.notification-emails=
+#notify empty data
+#kylin.job.notification-on-empty-data-load=false
+#notify source records change
+#kylin.job.notification-on-source-records-change=false
+#notify metadata persist
+#kylin.job.notification-on-metadata-persist=false
+#capacity threshold only invoked in test case
+#kylin.capacity.notification-enabled=false
 
 
 ### OTHER ###
diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index 71266afd4c..0824ecb996 100644
--- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -79,6 +79,7 @@ import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.dao.NExecutableDao;
+import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.BaseTestExecutable;
 import org.apache.kylin.job.execution.ChainedExecutable;
@@ -1332,7 +1333,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testGetJobOutput() {
+    public void testGetJobOutput() throws PersistentException {
         NExecutableManager manager = NExecutableManager.getInstance(jobService.getConfig(), "default");
         ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
         executableOutputPO.setStatus("SUCCEED");
@@ -1345,7 +1346,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testGetAllJobOutput() throws IOException {
+    public void testGetAllJobOutput() throws IOException, PersistentException {
         File file = temporaryFolder.newFile("execute_output.json." + System.currentTimeMillis() + ".log");
         for (int i = 0; i < 200; i++) {
             Files.write(file.toPath(), String.format(Locale.ROOT, "lines: %s\n", i).getBytes(Charset.defaultCharset()),
@@ -1839,7 +1840,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testGetStepOutput() {
+    public void testGetStepOutput() throws PersistentException {
         String jobId = "e1ad7bb0-522e-456a-859d-2eab1df448de";
         NExecutableManager manager = NExecutableManager.getInstance(jobService.getConfig(), "default");
         ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java
index abe3dd18cc..5827b5d6fe 100644
--- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java
+++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java
@@ -285,9 +285,12 @@ public class NProjectController extends NBasicController {
             @RequestBody JobNotificationConfigRequest jobNotificationConfigRequest) {
         checkRequiredArg("data_load_empty_notification_enabled",
                 jobNotificationConfigRequest.getDataLoadEmptyNotificationEnabled());
-        checkRequiredArg("job_error_notification_enabled",
-                jobNotificationConfigRequest.getJobErrorNotificationEnabled());
-        checkRequiredArg("job_notification_emails", jobNotificationConfigRequest.getJobNotificationEmails());
+        checkRequiredArg("job_notification_states",
+                jobNotificationConfigRequest.getJobNotificationStates());
+        checkRequiredArg("job_notification_emails",
+                jobNotificationConfigRequest.getJobNotificationEmails());
+        checkRequiredArg("metadata_persist_notification_enabled",
+                jobNotificationConfigRequest.getMetadataPersistNotificationEnabled());
         projectService.updateJobNotificationConfig(project, jobNotificationConfigRequest);
         return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, "", "");
     }
diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java
index 23aff14ac5..d8235a9983 100644
--- a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java
+++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java
@@ -243,9 +243,10 @@ public class NProjectControllerTest extends NLocalFileMetadataTestCase {
     public void testUpdateJobNotificationConfig() throws Exception {
         val request = new JobNotificationConfigRequest();
 
-        request.setJobErrorNotificationEnabled(true);
+        request.setJobNotificationStates(Arrays.asList("Succeed", "Error", "Discard"));
         request.setDataLoadEmptyNotificationEnabled(true);
         request.setJobNotificationEmails(Arrays.asList("fff@g.com"));
+        request.setMetadataPersistNotificationEnabled(false);
 
         Mockito.doNothing().when(projectService).updateJobNotificationConfig("default", request);
         mockMvc.perform(MockMvcRequestBuilders.put("/api/projects/{project}/job_notification_config", "default")
@@ -467,7 +468,7 @@ public class NProjectControllerTest extends NLocalFileMetadataTestCase {
                 .andExpect(MockMvcResultMatchers.status().isOk());
 
         Mockito.verify(nProjectController).getNonCustomProjectConfigs();
-        Assert.assertEquals(19, getTestConfig().getNonCustomProjectConfigs().size());
+        Assert.assertEquals(20, getTestConfig().getNonCustomProjectConfigs().size());
     }
 
     @Test
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
index 34160d76c6..6018a7465d 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
@@ -367,13 +367,15 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
         var response = projectService.getProjectConfig(project);
         val jobNotificationConfigRequest = new JobNotificationConfigRequest();
         jobNotificationConfigRequest.setDataLoadEmptyNotificationEnabled(false);
-        jobNotificationConfigRequest.setJobErrorNotificationEnabled(false);
+        jobNotificationConfigRequest.setJobNotificationStates(
+                Lists.newArrayList("Succeed", "Error", "Discard"));
         jobNotificationConfigRequest.setJobNotificationEmails(
                 Lists.newArrayList("user1@kyligence.io", "user2@kyligence.io", "user2@kyligence.io"));
+        jobNotificationConfigRequest.setMetadataPersistNotificationEnabled(false);
         projectService.updateJobNotificationConfig(project, jobNotificationConfigRequest);
         response = projectService.getProjectConfig(project);
         Assert.assertEquals(2, response.getJobNotificationEmails().size());
-        Assert.assertFalse(response.isJobErrorNotificationEnabled());
+        Assert.assertEquals(3, response.getJobNotificationStates().size());
         Assert.assertFalse(response.isDataLoadEmptyNotificationEnabled());
 
         jobNotificationConfigRequest
@@ -770,9 +772,11 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
 
         val jobNotificationConfigRequest = new JobNotificationConfigRequest();
         jobNotificationConfigRequest.setDataLoadEmptyNotificationEnabled(true);
-        jobNotificationConfigRequest.setJobErrorNotificationEnabled(true);
+        jobNotificationConfigRequest.setJobNotificationStates(
+                Lists.newArrayList("Succeed", "Error", "Discard"));
         jobNotificationConfigRequest.setJobNotificationEmails(
                 Lists.newArrayList("user1@kyligence.io", "user2@kyligence.io", "user2@kyligence.io"));
+        jobNotificationConfigRequest.setMetadataPersistNotificationEnabled(false);
         projectService.updateJobNotificationConfig(PROJECT, jobNotificationConfigRequest);
 
         projectService.updateQueryAccelerateThresholdConfig(PROJECT, 30, false);
@@ -788,12 +792,12 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase {
         updateProject();
         var response = projectService.getProjectConfig(PROJECT);
         Assert.assertEquals(2, response.getJobNotificationEmails().size());
-        Assert.assertTrue(response.isJobErrorNotificationEnabled());
+        Assert.assertEquals(3, response.getJobNotificationStates().size());
         Assert.assertTrue(response.isDataLoadEmptyNotificationEnabled());
 
         response = projectService.resetProjectConfig(PROJECT, "job_notification_config");
         Assert.assertEquals(0, response.getJobNotificationEmails().size());
-        Assert.assertFalse(response.isJobErrorNotificationEnabled());
+        Assert.assertEquals(3, response.getJobNotificationStates().size());
         Assert.assertFalse(response.isDataLoadEmptyNotificationEnabled());
 
         Assert.assertFalse(response.isFavoriteQueryTipsEnabled());
diff --git a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingDFMergeJobTest.java b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingDFMergeJobTest.java
index 182b255ed6..42dd156ed8 100644
--- a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingDFMergeJobTest.java
+++ b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingDFMergeJobTest.java
@@ -104,7 +104,7 @@ public class StreamingDFMergeJobTest extends StreamingTestCase {
         val flatTable = new CreateStreamingFlatTable(flatTableDesc, null, nSpanningTree, ss, null, null, null);
 
         val dataset = flatTable.generateStreamingDataset(config);
-        val builder = new StreamingDFBuildJob(PROJECT);
+        StreamingDFBuildJob builder = new StreamingDFBuildJob(PROJECT);
 
         val streamingEntry = new StreamingEntry();
         streamingEntry.parseParams(new String[] { PROJECT, DATAFLOW_ID, "1000", "", "xx" });