You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by Chao Long <wa...@qq.com> on 2019/03/01 04:11:14 UTC

Re: Retry mechanism is invalid when build with spark

Hi mailpig,
  I check the code and reproduce this problem. I create a jira issue for it[https://issues.apache.org/jira/browse/KYLIN-3838]. And If you have already fixed it, a pr is welcome.
------------------
Best Regards,
Chao Long


------------------ Original ------------------
From:  "mailpig"<al...@163.com>;
Date:  Mon, Feb 18, 2019 05:40 PM
To:  "dev"<de...@kylin.apache.org>;

Subject:  Retry mechanism is invalid when build with spark



In kylin-2.5.2, retry mechanism is invalid when build with spark.
In SparkExecutable, when spark app runing failed, it while return a result
with -1. Then, the function dowork will return a ExecuteResult without a
Throwable object, it's null. source code is:
@SuppressWarnings("checkstyle:methodlength")
    @Override
    protected ExecuteResult doWork(ExecutableContext context) throws
ExecuteException {
        ExecutableManager mgr = getManager();
        Map<String, String> extra = mgr.getOutput(getId()).getExtra();
        if (extra.containsKey(ExecutableConstants.SPARK_JOB_ID)) {
            return onResumed(extra.get(ExecutableConstants.SPARK_JOB_ID),
mgr);
        } else {
            String cubeName =
this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
            CubeInstance cube =
CubeManager.getInstance(context.getConfig()).getCube(cubeName);
            final KylinConfig config = cube.getConfig();

            setAlgorithmLayer();

            if (KylinConfig.getSparkHome() == null) {
                throw new NullPointerException();
            }
            if (config.getKylinJobJarPath() == null) {
                throw new NullPointerException();
            }
            String jars = this.getParam(JARS);

            //hadoop conf dir
            String hadoopConf = null;
            hadoopConf = System.getProperty("kylin.hadoop.conf.dir");

            if (StringUtils.isEmpty(hadoopConf)) {
                throw new RuntimeException(
                        "kylin_hadoop_conf_dir is empty, check if there's
error in the output of 'kylin.sh start'");
            }

            logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");

            String jobJar = config.getKylinJobJarPath();
            if (StringUtils.isEmpty(jars)) {
                jars = jobJar;
            }

            String segmentID =
this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
            CubeSegment segment = cube.getSegmentById(segmentID);
            Segments<CubeSegment> mergingSeg =
cube.getMergingSegments(segment);
            dumpMetadata(segment, mergingSeg);

            StringBuilder stringBuilder = new StringBuilder();
            if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
                stringBuilder.append(
                        "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit
--class org.apache.kylin.common.util.SparkEntry ");
            } else {
                stringBuilder.append(
                        "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit
--class org.apache.kylin.common.util.SparkEntry ");
            }

            Map<String, String> sparkConfs =
config.getSparkConfigOverride();

            String sparkConfigName = getSparkConfigName();
            if (sparkConfigName != null) {
                Map<String, String> sparkSpecificConfs =
config.getSparkConfigOverrideWithSpecificName(sparkConfigName);
                sparkConfs.putAll(sparkSpecificConfs);
            }

            for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
                stringBuilder.append(" --conf
").append(entry.getKey()).append("=").append(entry.getValue())
                        .append(" ");
            }

            stringBuilder.append("--jars %s %s %s");
            final String cmd = String.format(Locale.ROOT,
stringBuilder.toString(), hadoopConf,
                    KylinConfig.getSparkHome(), jars, jobJar, formatArgs());
            logger.info("cmd: " + cmd);
            final ExecutorService executorService =
Executors.newSingleThreadExecutor();
            final CliCommandExecutor exec = new CliCommandExecutor();
            final PatternedLogger patternedLogger = new
PatternedLogger(logger, new PatternedLogger.ILogListener() {
                @Override
                public void onLogEvent(String infoKey, Map<String, String>
info) {
                    // only care three properties here
                    if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
                            ||
ExecutableConstants.YARN_APP_ID.equals(infoKey)
                            ||
ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
                        getManager().addJobInfo(getId(), info);
                    }
                }
            });
            Callable callable = new Callable<Pair&lt;Integer, String>>() {
                @Override
                public Pair<Integer, String> call() throws Exception {
                    Pair<Integer, String> result;
                    try {
                        result = exec.execute(cmd, patternedLogger);
                    } catch (Exception e) {
                        logger.error("error run spark job:", e);
                        *result = new Pair<>(-1, e.getMessage());*
                    }
                    return result;
                }
            };
            try {
                Future<Pair&lt;Integer, String>> future =
executorService.submit(callable);
                Pair<Integer, String> result = null;
                while (!isDiscarded() && !isPaused()) {
                    if (future.isDone()) {
                        result = future.get();
                        break;
                    } else {
                        Thread.sleep(5000);
                    }
                }

                if (future.isDone() == false) { // user cancelled
                    executorService.shutdownNow(); // interrupt
                    extra = mgr.getOutput(getId()).getExtra();
                    if (extra != null &&
extra.get(ExecutableConstants.SPARK_JOB_ID) != null) {
                       
killAppRetry(extra.get(ExecutableConstants.SPARK_JOB_ID));
                    }

                    if (isDiscarded()) {
                        return new
ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
                    }
                    if (isPaused()) {
                        return new
ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
                    }

                    throw new IllegalStateException();
                }

                if (result == null) {
                    result = future.get();
                }
                if (result != null && result.getFirst() == 0) {
                    // done, update all properties
                    Map<String, String> joblogInfo =
patternedLogger.getInfo();
                    // read counter from hdfs
                    String counterOutput =
getParam(BatchConstants.ARG_COUNTER_OUPUT);
                    if (counterOutput != null) {
                        if (HadoopUtil.getWorkingFileSystem().exists(new
Path(counterOutput))) {
                            Map<String, String> counterMap =
HadoopUtil.readFromSequenceFile(counterOutput);
                            joblogInfo.putAll(counterMap);
                        } else {
                            logger.warn("Spark counter output path not
exists: " + counterOutput);
                        }
                    }
                    readCounters(joblogInfo);
                    getManager().addJobInfo(getId(), joblogInfo);
                    return new ExecuteResult(ExecuteResult.State.SUCCEED,
patternedLogger.getBufferedLog());
                }
                // clear SPARK_JOB_ID on job failure.
                extra = mgr.getOutput(getId()).getExtra();
                extra.put(ExecutableConstants.SPARK_JOB_ID, "");
                getManager().addJobInfo(getId(), extra);
               * return new ExecuteResult(ExecuteResult.State.ERROR, result
!= null ? result.getSecond() : *"");
            } catch (Exception e) {
                logger.error("error run spark job:", e);
                return ExecuteResult.createError(e);
            }
        }
    }

However, kylin's retry mechanism depands on the Throwable object in
ExecuteResult. If the throwable is null, it will not retry the job. source
code:
  public boolean needRetry(int retry, Throwable t) {
        if (retry > KylinConfig.getInstanceFromEnv().getJobRetry() || *t ==
nul*l
                || (this instanceof DefaultChainedExecutable)) {
            return false;
        } else {
            return isRetryableException(t.getClass().getName());
        }
    }
 
Please check!


--
Sent from: http://apache-kylin.74782.x6.nabble.com/