You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/12 11:21:19 UTC
[31/35] kylin git commit: KYLIN-1827 Send mail notification when
runtime exception throws during build/merge cube
KYLIN-1827 Send mail notification when runtime exception throws during build/merge cube
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0954176a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0954176a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0954176a
Branch: refs/heads/1.5.x-HBase1.x
Commit: 0954176adb50df25d0995a7fbeb68e64ee7aed79
Parents: c59d63d
Author: Ma,Gang <ga...@ebay.com>
Authored: Fri Jul 15 11:27:40 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Sep 12 11:23:19 2016 +0800
----------------------------------------------------------------------
.../kylin/job/execution/AbstractExecutable.java | 123 ++++++++++++++-----
.../org/apache/kylin/engine/mr/CubingJob.java | 40 ++++++
2 files changed, 129 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0954176a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 09f9b54..1eee5da 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.MailService;
import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.impl.threadpool.DefaultContext;
import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
@@ -99,31 +100,62 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
Preconditions.checkArgument(executableContext instanceof DefaultContext);
ExecuteResult result = null;
+ try {
+ onExecuteStart(executableContext);
+ Throwable exception;
+ do {
+ if (retry > 0) {
+ logger.info("Retry " + retry);
+ }
+ exception = null;
+ result = null;
+ try {
+ result = doWork(executableContext);
+ } catch (Throwable e) {
+ logger.error("error running Executable: " + this.toString());
+ exception = e;
+ }
+ retry++;
+ } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
- onExecuteStart(executableContext);
- Throwable exception;
- do {
- if (retry > 0) {
- logger.info("Retry " + retry);
+ if (exception != null) {
+ onExecuteError(exception, executableContext);
+ throw new ExecuteException(exception);
}
- exception = null;
- result = null;
- try {
- result = doWork(executableContext);
- } catch (Throwable e) {
- logger.error("error running Executable: " + this.toString());
- exception = e;
+
+ onExecuteFinished(result, executableContext);
+ } catch (Exception e) {
+ if (isMetaDataPersistException(e)){
+ handleMetaDataPersistException(e);
+ }
+ if (e instanceof ExecuteException){
+ throw e;
+ } else {
+ throw new ExecuteException(e);
}
- retry++;
- } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
+ }
+ return result;
+ }
+
+ protected void handleMetaDataPersistException(Exception e) {
+ // do nothing.
+ }
- if (exception != null) {
- onExecuteError(exception, executableContext);
- throw new ExecuteException(exception);
+ private boolean isMetaDataPersistException(Exception e) {
+ if (e instanceof PersistentException){
+ return true;
}
- onExecuteFinished(result, executableContext);
- return result;
+ Throwable t = e.getCause();
+ int depth = 0;
+ while (t!= null && depth<5) {
+ depth ++;
+ if (t instanceof PersistentException){
+ return true;
+ }
+ t = t.getCause();
+ }
+ return false;
}
protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException;
@@ -209,29 +241,52 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected final void notifyUserStatusChange(ExecutableContext context, ExecutableState state) {
try {
- List<String> users = Lists.newArrayList();
- users.addAll(getNotifyList());
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final String[] adminDls = kylinConfig.getAdminDls();
- if (null != adminDls) {
- for (String adminDl : adminDls) {
- users.add(adminDl);
- }
- }
+ List<String> users = getAllNofifyUsers(kylinConfig);
if (users.isEmpty()) {
logger.warn("no need to send email, user list is empty");
return;
}
final Pair<String, String> email = formatNotifications(context, state);
- if (email == null) {
- logger.warn("no need to send email, content is null");
+ doSendMail(kylinConfig,users,email);
+ } catch (Exception e) {
+ logger.error("error send email", e);
+ }
+ }
+
+ private List<String> getAllNofifyUsers(KylinConfig kylinConfig){
+ List<String> users = Lists.newArrayList();
+ users.addAll(getNotifyList());
+ final String[] adminDls = kylinConfig.getAdminDls();
+ if (null != adminDls) {
+ for (String adminDl : adminDls) {
+ users.add(adminDl);
+ }
+ }
+ return users;
+ }
+
+ private void doSendMail(KylinConfig kylinConfig, List<String> users, Pair<String,String> email){
+ if (email == null) {
+ logger.warn("no need to send email, content is null");
+ return;
+ }
+ logger.info("prepare to send email to:" + users);
+ logger.info("job name:" + getName());
+ logger.info("submitter:" + getSubmitter());
+ logger.info("notify list:" + users);
+ new MailService(kylinConfig).sendMail(users, email.getLeft(), email.getRight());
+ }
+
+ protected void sendMail(Pair<String, String> email) {
+ try {
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ List<String> users = getAllNofifyUsers(kylinConfig);
+ if (users.isEmpty()) {
+ logger.warn("no need to send email, user list is empty");
return;
}
- logger.info("prepare to send email to:" + users);
- logger.info("job name:" + getName());
- logger.info("submitter:" + getSubmitter());
- logger.info("notify list:" + users);
- new MailService(kylinConfig).sendMail(users, email.getLeft(), email.getRight());
+ doSendMail(kylinConfig, users, email);
} catch (Exception e) {
logger.error("error send email", e);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0954176a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index c9ffe14..9c7f57a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -18,6 +18,8 @@
package org.apache.kylin.engine.mr;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
@@ -183,6 +185,44 @@ public class CubingJob extends DefaultChainedExecutable {
super.onExecuteFinished(result, executableContext);
}
+ /**
+ * build fail because the metadata store has problem.
+ * @param exception
+ */
+ @Override
+ protected void handleMetaDataPersistException(Exception exception) {
+ String title = "[ERROR] - [" + getDeployEnvName() + "] - [" + getProjectName() + "] - " + CubingExecutableUtil.getCubeName(this.getParams());
+ String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
+ final String UNKNOWN = "UNKNOWN";
+ String errMsg = null;
+ if (exception != null) {
+ final StringWriter out = new StringWriter();
+ exception.printStackTrace(new PrintWriter(out));
+ errMsg = out.toString();
+ }
+
+ content = content.replaceAll("\\$\\{job_name\\}", getName());
+ content = content.replaceAll("\\$\\{result\\}", ExecutableState.ERROR.toString());
+ content = content.replaceAll("\\$\\{env_name\\}", getDeployEnvName());
+ content = content.replaceAll("\\$\\{project_name\\}", getProjectName());
+ content = content.replaceAll("\\$\\{cube_name\\}", CubingExecutableUtil.getCubeName(this.getParams()));
+ content = content.replaceAll("\\$\\{source_records_count\\}", UNKNOWN);
+ content = content.replaceAll("\\$\\{start_time\\}", UNKNOWN);
+ content = content.replaceAll("\\$\\{duration\\}", UNKNOWN);
+ content = content.replaceAll("\\$\\{mr_waiting\\}", UNKNOWN);
+ content = content.replaceAll("\\$\\{last_update_time\\}", UNKNOWN);
+ content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter"));
+ content = content.replaceAll("\\$\\{error_log\\}", Matcher.quoteReplacement(StringUtil.noBlank(errMsg, "no error message")));
+
+ try {
+ InetAddress inetAddress = InetAddress.getLocalHost();
+ content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
+ } catch (UnknownHostException e) {
+ logger.warn(e.getLocalizedMessage(), e);
+ }
+ sendMail(Pair.of(title,content));
+ }
+
public long getMapReduceWaitTime() {
return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
}