You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by Chao Long <wa...@qq.com> on 2019/03/01 04:11:14 UTC
Re: Retry mechanism is invalid when build with spark
Hi mailpig,
I check the code and reproduce this problem. I create a jira issue for it[https://issues.apache.org/jira/browse/KYLIN-3838]. And If you have already fixed it, a pr is welcome.
------------------
Best Regards,
Chao Long
------------------ Original ------------------
From: "mailpig"<al...@163.com>;
Date: Mon, Feb 18, 2019 05:40 PM
To: "dev"<de...@kylin.apache.org>;
Subject: Retry mechanism is invalid when build with spark
In kylin-2.5.2, retry mechanism is invalid when build with spark.
In SparkExecutable, when spark app runing failed, it while return a result
with -1. Then, the function dowork will return a ExecuteResult without a
Throwable object, it's null. source code is:
@SuppressWarnings("checkstyle:methodlength")
@Override
protected ExecuteResult doWork(ExecutableContext context) throws
ExecuteException {
ExecutableManager mgr = getManager();
Map<String, String> extra = mgr.getOutput(getId()).getExtra();
if (extra.containsKey(ExecutableConstants.SPARK_JOB_ID)) {
return onResumed(extra.get(ExecutableConstants.SPARK_JOB_ID),
mgr);
} else {
String cubeName =
this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
CubeInstance cube =
CubeManager.getInstance(context.getConfig()).getCube(cubeName);
final KylinConfig config = cube.getConfig();
setAlgorithmLayer();
if (KylinConfig.getSparkHome() == null) {
throw new NullPointerException();
}
if (config.getKylinJobJarPath() == null) {
throw new NullPointerException();
}
String jars = this.getParam(JARS);
//hadoop conf dir
String hadoopConf = null;
hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
if (StringUtils.isEmpty(hadoopConf)) {
throw new RuntimeException(
"kylin_hadoop_conf_dir is empty, check if there's
error in the output of 'kylin.sh start'");
}
logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
String jobJar = config.getKylinJobJarPath();
if (StringUtils.isEmpty(jars)) {
jars = jobJar;
}
String segmentID =
this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
CubeSegment segment = cube.getSegmentById(segmentID);
Segments<CubeSegment> mergingSeg =
cube.getMergingSegments(segment);
dumpMetadata(segment, mergingSeg);
StringBuilder stringBuilder = new StringBuilder();
if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
stringBuilder.append(
"set HADOOP_CONF_DIR=%s && %s/bin/spark-submit
--class org.apache.kylin.common.util.SparkEntry ");
} else {
stringBuilder.append(
"export HADOOP_CONF_DIR=%s && %s/bin/spark-submit
--class org.apache.kylin.common.util.SparkEntry ");
}
Map<String, String> sparkConfs =
config.getSparkConfigOverride();
String sparkConfigName = getSparkConfigName();
if (sparkConfigName != null) {
Map<String, String> sparkSpecificConfs =
config.getSparkConfigOverrideWithSpecificName(sparkConfigName);
sparkConfs.putAll(sparkSpecificConfs);
}
for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
stringBuilder.append(" --conf
").append(entry.getKey()).append("=").append(entry.getValue())
.append(" ");
}
stringBuilder.append("--jars %s %s %s");
final String cmd = String.format(Locale.ROOT,
stringBuilder.toString(), hadoopConf,
KylinConfig.getSparkHome(), jars, jobJar, formatArgs());
logger.info("cmd: " + cmd);
final ExecutorService executorService =
Executors.newSingleThreadExecutor();
final CliCommandExecutor exec = new CliCommandExecutor();
final PatternedLogger patternedLogger = new
PatternedLogger(logger, new PatternedLogger.ILogListener() {
@Override
public void onLogEvent(String infoKey, Map<String, String>
info) {
// only care three properties here
if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
||
ExecutableConstants.YARN_APP_ID.equals(infoKey)
||
ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
getManager().addJobInfo(getId(), info);
}
}
});
Callable callable = new Callable<Pair<Integer, String>>() {
@Override
public Pair<Integer, String> call() throws Exception {
Pair<Integer, String> result;
try {
result = exec.execute(cmd, patternedLogger);
} catch (Exception e) {
logger.error("error run spark job:", e);
*result = new Pair<>(-1, e.getMessage());*
}
return result;
}
};
try {
Future<Pair<Integer, String>> future =
executorService.submit(callable);
Pair<Integer, String> result = null;
while (!isDiscarded() && !isPaused()) {
if (future.isDone()) {
result = future.get();
break;
} else {
Thread.sleep(5000);
}
}
if (future.isDone() == false) { // user cancelled
executorService.shutdownNow(); // interrupt
extra = mgr.getOutput(getId()).getExtra();
if (extra != null &&
extra.get(ExecutableConstants.SPARK_JOB_ID) != null) {
killAppRetry(extra.get(ExecutableConstants.SPARK_JOB_ID));
}
if (isDiscarded()) {
return new
ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
}
if (isPaused()) {
return new
ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
}
throw new IllegalStateException();
}
if (result == null) {
result = future.get();
}
if (result != null && result.getFirst() == 0) {
// done, update all properties
Map<String, String> joblogInfo =
patternedLogger.getInfo();
// read counter from hdfs
String counterOutput =
getParam(BatchConstants.ARG_COUNTER_OUPUT);
if (counterOutput != null) {
if (HadoopUtil.getWorkingFileSystem().exists(new
Path(counterOutput))) {
Map<String, String> counterMap =
HadoopUtil.readFromSequenceFile(counterOutput);
joblogInfo.putAll(counterMap);
} else {
logger.warn("Spark counter output path not
exists: " + counterOutput);
}
}
readCounters(joblogInfo);
getManager().addJobInfo(getId(), joblogInfo);
return new ExecuteResult(ExecuteResult.State.SUCCEED,
patternedLogger.getBufferedLog());
}
// clear SPARK_JOB_ID on job failure.
extra = mgr.getOutput(getId()).getExtra();
extra.put(ExecutableConstants.SPARK_JOB_ID, "");
getManager().addJobInfo(getId(), extra);
* return new ExecuteResult(ExecuteResult.State.ERROR, result
!= null ? result.getSecond() : *"");
} catch (Exception e) {
logger.error("error run spark job:", e);
return ExecuteResult.createError(e);
}
}
}
However, kylin's retry mechanism depands on the Throwable object in
ExecuteResult. If the throwable is null, it will not retry the job. source
code:
public boolean needRetry(int retry, Throwable t) {
if (retry > KylinConfig.getInstanceFromEnv().getJobRetry() || *t ==
nul*l
|| (this instanceof DefaultChainedExecutable)) {
return false;
} else {
return isRetryableException(t.getClass().getName());
}
}
Please check!
--
Sent from: http://apache-kylin.74782.x6.nabble.com/