You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/14 00:57:15 UTC

[kylin] branch engine-flink updated: KYLIN-3868 Implement Flink batch cubing job builder

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/engine-flink by this push:
     new 3e5eda7  KYLIN-3868 Implement Flink batch cubing job builder
3e5eda7 is described below

commit 3e5eda7acddbb9fa6bb22b5251cbffd553826742
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Mar 13 23:07:53 2019 +0800

    KYLIN-3868 Implement Flink batch cubing job builder
---
 .../kylin/job/constant/ExecutableConstants.java    |   3 +
 .../engine/flink/FlinkBatchCubingJobBuilder2.java  | 125 ++++++
 .../apache/kylin/engine/flink/FlinkExecutable.java | 441 +++++++++++++++++++++
 .../engine/flink/FlinkOnYarnConfigMapping.java     |  69 ++++
 4 files changed, 638 insertions(+)

diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 5735a80..99a5254 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -30,6 +30,7 @@ public final class ExecutableConstants {
     public static final String YARN_APP_URL = "yarn_application_tracking_url";
     public static final String MR_JOB_ID = "mr_job_id";
     public static final String SPARK_JOB_ID = "spark_job_id";
+    public static final String FLINK_JOB_ID = "flink_job_id";
     public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
     public static final String SOURCE_RECORDS_COUNT = "source_records_count";
     public static final String SOURCE_RECORDS_SIZE = "source_records_size";
@@ -47,6 +48,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid";
     public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem";
     public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark";
+    public static final String STEP_NAME_BUILD_FLINK_CUBE = "Build Cube with Flink";
     public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid";
     public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
     public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
@@ -73,4 +75,5 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE = "Update Cube Info";
 
     public static final String SPARK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
+    public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
 }
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
new file mode 100644
index 0000000..ba475bf
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.LookupMaterializeContext;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The batch cubing job builder implements with Apache Flink.
+ */
+public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
+
+    private static final Logger logger = LoggerFactory.getLogger(FlinkBatchCubingJobBuilder2.class);
+
+    private final IFlinkInput.IFlinkBatchCubingInputSide inputSide;
+    private final IFlinkOutput.IFlinkBatchCubingOutputSide outputSide;
+
+    public FlinkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+        super(newSegment, submitter);
+        this.inputSide = FlinkUtil.getBatchCubingInputSide(seg);
+        this.outputSide = FlinkUtil.getBatchCubingOutputSide(seg);
+    }
+
+    public CubingJob build() {
+        logger.info("Flink job to BUILD segment " + seg);
+
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
+
+        // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        // Phase 2: Build Dictionary
+        result.addTask(createFactDistinctColumnsStep(jobId));
+
+        if (isEnableUHCDictStep()) {
+            result.addTask(createBuildUHCDictStep(jobId));
+        }
+
+        result.addTask(createBuildDictionaryStep(jobId));
+        result.addTask(createSaveStatisticsStep(jobId));
+
+        // add materialize lookup tables if needed
+        LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
+
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 3: Build Cube
+        addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+        final FlinkExecutable flinkExecutable = new FlinkExecutable();
+        flinkExecutable.setClassName(FlinkCubingByLayer.class.getName());
+        configureFlinkJob(seg, flinkExecutable, jobId, cuboidRootPath);
+        result.addTask(flinkExecutable);
+    }
+
+    public void configureFlinkJob(final CubeSegment seg, final FlinkExecutable flinkExecutable,
+            final String jobId, final String cuboidRootPath) {
+        final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
+        flinkExecutable.setParam(FlinkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        flinkExecutable.setParam(FlinkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        flinkExecutable.setParam(FlinkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
+                seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+        flinkExecutable.setParam(FlinkCubingByLayer.OPTION_INPUT_PATH.getOpt(),
+                tablePath);
+        flinkExecutable.setParam(FlinkCubingByLayer.OPTION_META_URL.getOpt(),
+                getSegmentMetadataUrl(seg.getConfig(), jobId));
+        flinkExecutable.setParam(FlinkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
+        flinkExecutable.setJobId(jobId);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getFlinkAdditionalJars());
+        flinkExecutable.setJars(jars.toString());
+        flinkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_FLINK_CUBE);
+    }
+
+    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+        Map<String, String> param = new HashMap<>();
+        param.put("path", getDumpMetadataPath(jobId));
+        return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
+    }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
new file mode 100644
index 0000000..83cc815
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.Segments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * The Flink executable contains job submission specific business logic.
+ */
+public class FlinkExecutable extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(FlinkExecutable.class);
+
+    private static final String CLASS_NAME = "className";
+    private static final String JARS = "jars";
+    private static final String JOB_ID = "jobId";
+    private static final String COUNTER_SAVE_AS = "CounterSaveAs";
+    private static final String CONFIG_NAME = "configName";
+
+    public void setClassName(String className) {
+        this.setParam(CLASS_NAME, className);
+    }
+
+    public void setJobId(String jobId) {
+        this.setParam(JOB_ID, jobId);
+    }
+
+    public void setJars(String jars) {
+        this.setParam(JARS, jars);
+    }
+
+    public void setCounterSaveAs(String value) {
+        this.setParam(COUNTER_SAVE_AS, value);
+    }
+
+    public void setCounterSaveAs(String value, String counterOutputPath) {
+        this.setParam(COUNTER_SAVE_AS, value);
+        this.setParam(BatchConstants.ARG_COUNTER_OUPUT, counterOutputPath);
+    }
+
+    public String getCounterSaveAs() {
+        return getParam(COUNTER_SAVE_AS);
+    }
+
+    public void setFlinkConfigName(String configName) {
+        this.setParam(CONFIG_NAME, configName);
+    }
+
+    public String getFlinkConfigName() {
+        return getParam(CONFIG_NAME);
+    }
+
+    private String formatArgs() {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (Map.Entry<String, String> entry : getParams().entrySet()) {
+            StringBuilder tmp = new StringBuilder();
+            tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
+            if (entry.getKey().equals(CLASS_NAME)) {
+                stringBuilder.insert(0, tmp);
+            } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)
+                    || entry.getKey().equals(COUNTER_SAVE_AS) || entry.getKey().equals(CONFIG_NAME)) {
+                // JARS is for flink-submit, not for app
+                continue;
+            } else {
+                stringBuilder.append(tmp);
+            }
+        }
+        if (stringBuilder.length() > 0) {
+            return stringBuilder.substring(0, stringBuilder.length() - 1).toString();
+        } else {
+            return StringUtils.EMPTY;
+        }
+    }
+
+    @SuppressWarnings("checkstyle:methodlength")
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException {
+        ExecutableManager manager = getManager();
+        Map<String, String> extra = manager.getOutput(getId()).getExtra();
+        String flinkJobId = extra.get(ExecutableConstants.FLINK_JOB_ID);
+        if (!StringUtils.isEmpty(flinkJobId)) {
+            return onResumed(flinkJobId, manager);
+        } else {
+            String cubeName = this.getParam(FlinkCubingByLayer.OPTION_CUBE_NAME.getOpt());
+            CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
+
+            final KylinConfig config = cube.getConfig();
+
+            setAlgorithmLayer();
+
+            if (KylinConfig.getFlinkHome() == null) {
+                throw new NullPointerException();
+            }
+            if (config.getKylinJobJarPath() == null) {
+                throw new NullPointerException();
+            }
+
+            String jars = this.getParam(JARS);
+
+            String hadoopConf = null;
+            hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
+
+            if (StringUtils.isEmpty(hadoopConf)) {
+                throw new RuntimeException(
+                        "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
+            }
+
+            logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+
+            String jobJar = config.getKylinJobJarPath();
+            if (StringUtils.isEmpty(jars)) {
+                jars = jobJar;
+            }
+
+            String segmentID = this.getParam(FlinkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+            CubeSegment segment = cube.getSegmentById(segmentID);
+            Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+            dumpMetadata(segment, mergingSeg);
+
+            StringBuilder sb = new StringBuilder();
+            if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
+                sb.append("set HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster ");
+            } else {
+                sb.append("export HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster ");
+            }
+
+            Map<String, String> flinkConfs = config.getFlinkConfigOverride();
+
+            String flinkConfigName = getFlinkConfigName();
+            if (flinkConfigName != null) {
+                Map<String, String> flinkSpecificConfs = config.getFlinkConfigOverrideWithSpecificName(flinkConfigName);
+                flinkSpecificConfs.putAll(flinkConfs);
+            }
+
+            for (Map.Entry<String, String> entry : flinkConfs.entrySet()) {
+                if (!FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.containsKey(entry.getKey())) {
+                    logger.warn("Unsupported Flink configuration pair : key[%s], value[%s]", entry.getKey(), entry.getValue());
+                    throw new IllegalArgumentException("Unsupported Flink configuration pair : key["
+                            + entry.getKey() + "], value[" + entry.getValue() + "]");
+                }
+
+                String onYarnConfigOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey());
+
+                sb.append(" ").append(onYarnConfigOptionKey).append(" ").append(entry.getValue());
+            }
+
+            sb.append(" -c org.apache.kylin.common.util.FlinkEntry %s %s ");
+            final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf,
+                    KylinConfig.getFlinkHome(), jars, formatArgs());
+            logger.info("cmd: " + cmd);
+            final ExecutorService executorService = Executors.newSingleThreadExecutor();
+            final CliCommandExecutor exec = new CliCommandExecutor();
+            final PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
+                @Override
+                public void onLogEvent(String infoKey, Map<String, String> info) {
+                    // only care three properties here
+                    if (ExecutableConstants.FLINK_JOB_ID.equals(infoKey)
+                            || ExecutableConstants.YARN_APP_ID.equals(infoKey)
+                            || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
+                        getManager().addJobInfo(getId(), info);
+                    }
+                }
+            });
+
+            Callable callable = new Callable<Pair<Integer, String>>() {
+                @Override
+                public Pair<Integer, String> call() throws Exception {
+                    Pair<Integer, String> result;
+                    try {
+                        result = exec.execute(cmd, patternedLogger);
+                    } catch (Exception e) {
+                        logger.error("error run Flink job:", e);
+                        result = new Pair<>(-1, e.getMessage());
+                    }
+                    return result;
+                }
+            };
+
+            try {
+                Future<Pair<Integer, String>> future = executorService.submit(callable);
+                Pair<Integer, String> result = null;
+                while (!isDiscarded() && !isPaused()) {
+                    if (future.isDone()) {
+                        result = future.get();
+                        break;
+                    } else {
+                        Thread.sleep(5000);
+                    }
+                }
+
+                if (future.isDone() == false) { // user cancelled
+                    executorService.shutdownNow(); // interrupt
+                    extra = manager.getOutput(getId()).getExtra();
+                    if (extra != null && extra.get(ExecutableConstants.FLINK_JOB_ID) != null) {
+                        killAppRetry(extra.get(ExecutableConstants.FLINK_JOB_ID));
+                    }
+
+                    if (isDiscarded()) {
+                        return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
+                    }
+                    if (isPaused()) {
+                        return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
+                    }
+
+                    throw new IllegalStateException();
+                }
+
+                if (result == null) {
+                    result = future.get();
+                }
+                if (result != null && result.getFirst() == 0) {
+                    // done, update all properties
+                    Map<String, String> joblogInfo = patternedLogger.getInfo();
+                    // read counter from hdfs
+                    String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+                    if (counterOutput != null) {
+                        if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) {
+                            Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
+                            joblogInfo.putAll(counterMap);
+                        } else {
+                            logger.warn("Flink counter output path not exists: " + counterOutput);
+                        }
+                    }
+                    readCounters(joblogInfo);
+                    getManager().addJobInfo(getId(), joblogInfo);
+                    return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
+                }
+                // clear FLINK_JOB_ID on job failure.
+                extra = manager.getOutput(getId()).getExtra();
+                extra.put(ExecutableConstants.FLINK_JOB_ID, "");
+                getManager().addJobInfo(getId(), extra);
+                return new ExecuteResult(ExecuteResult.State.ERROR, result != null ? result.getSecond() : "");
+            } catch (Exception e) {
+                logger.error("error run Flink job:", e);
+                return ExecuteResult.createError(e);
+            }
+        }
+    }
+
+    private ExecuteResult onResumed(String appId, ExecutableManager mgr) throws ExecuteException {
+        Map<String, String> info = new HashMap<>();
+        try {
+            logger.info("flink_job_id:" + appId + " resumed");
+            info.put(ExecutableConstants.FLINK_JOB_ID, appId);
+
+            while (!isPaused() && !isDiscarded()) {
+                String status = getAppState(appId);
+
+                if (status.equals("FAILED") || status.equals("KILLED")) {
+                    mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, appId + " has failed");
+                    return new ExecuteResult(ExecuteResult.State.FAILED, appId + " has failed");
+                }
+
+                if (status.equals("SUCCEEDED")) {
+                    mgr.addJobInfo(getId(), info);
+                    return new ExecuteResult(ExecuteResult.State.SUCCEED, appId + " has finished");
+                }
+
+                Thread.sleep(5000);
+
+            }
+
+            killAppRetry(appId);
+
+            if (isDiscarded()) {
+                return new ExecuteResult(ExecuteResult.State.DISCARDED, appId + " is discarded");
+            } else {
+                return new ExecuteResult(ExecuteResult.State.STOPPED, appId + " is stopped");
+            }
+        } catch (Exception e) {
+            logger.error("error run spark job:", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+    private String getAppState(String appId) throws IOException {
+        CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+        PatternedLogger patternedLogger = new PatternedLogger(logger);
+        String stateCmd = String.format(Locale.ROOT, "yarn application -status %s", appId);
+        executor.execute(stateCmd, patternedLogger);
+        Map<String, String> info = patternedLogger.getInfo();
+        return info.get(ExecutableConstants.YARN_APP_STATE);
+    }
+
+    private void setAlgorithmLayer() {
+        ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubingJob cubingJob = (CubingJob) execMgr.getJob(this.getParam(JOB_ID));
+        cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
+    }
+
+    private void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException {
+        try {
+            if (mergingSeg == null || mergingSeg.size() == 0) {
+                attachSegmentMetadataWithDict(segment);
+            } else {
+                List<CubeSegment> allRelatedSegs = new ArrayList();
+                allRelatedSegs.add(segment);
+                allRelatedSegs.addAll(mergingSeg);
+                attachSegmentsMetadataWithDict(allRelatedSegs);
+            }
+        } catch (IOException e) {
+            throw new ExecuteException("meta dump failed");
+        }
+    }
+
+    private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
+        dumpList.addAll(segment.getDictionaryPaths());
+        ResourceStore rs = ResourceStore.getStore(segment.getConfig());
+        if (rs.exists(segment.getStatisticsResourcePath())) {
+            // cube statistics is not available for new segment
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
+
+        JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(),
+                this.getParam(FlinkCubingByLayer.OPTION_META_URL.getOpt()));
+    }
+
+    private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+        ResourceStore rs = ResourceStore.getStore(segments.get(0).getConfig());
+        for (CubeSegment segment : segments) {
+            dumpList.addAll(segment.getDictionaryPaths());
+            if (rs.exists(segment.getStatisticsResourcePath())) {
+                // cube statistics is not available for new segment
+                dumpList.add(segment.getStatisticsResourcePath());
+            }
+        }
+
+        JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(),
+                this.getParam(FlinkCubingByLayer.OPTION_META_URL.getOpt()));
+    }
+
+    private int killAppRetry(String appId) throws IOException, InterruptedException {
+        String state = getAppState(appId);
+        if ("SUCCEEDED".equals(state) || "FAILED".equals(state) || "KILLED".equals(state)) {
+            logger.warn(appId + "is final state, no need to kill");
+            return 0;
+        }
+
+        killApp(appId);
+
+        state = getAppState(appId);
+        int retry = 0;
+        while (state == null || !state.equals("KILLED") && retry < 5) {
+            killApp(appId);
+
+            Thread.sleep(1000);
+
+            state = getAppState(appId);
+            retry++;
+        }
+
+        if ("KILLED".equals(state)) {
+            logger.info(appId + " killed successfully");
+            return 0;
+        } else {
+            logger.info(appId + " killed failed");
+            return 1;
+        }
+    }
+
+    private void killApp(String appId) throws IOException, InterruptedException {
+        CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+        String killCmd = String.format(Locale.ROOT, "yarn application -kill %s", appId);
+        executor.execute(killCmd);
+    }
+
+    private void readCounters(final Map<String, String> info) {
+        String counterSaveAs = getCounterSaveAs();
+        if (counterSaveAs != null) {
+            String[] saveAsNames = counterSaveAs.split(",");
+            saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_COUNT), saveAsNames, 0, info);
+            saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_SIZE), saveAsNames, 1, info);
+            saveCounterAs(info.get(ExecutableConstants.HDFS_BYTES_WRITTEN), saveAsNames, 2, info);
+        }
+    }
+
+    private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
+        if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
+            info.put(saveAsNames[i].trim(), counter);
+        }
+    }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
new file mode 100644
index 0000000..105e9ec
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A util class which build mapping between Flink config option keys from flink-conf.yaml
+ * and Flink on YARN cmd option.
+ */
+public class FlinkOnYarnConfigMapping {
+
+    public static final Map<String, String> flinkOnYarnConfigMap;
+
+    static {
+        flinkOnYarnConfigMap = new HashMap<>();
+
+        //mapping job manager heap size -> -yjm
+        ConfigOption<String> jmHeapSizeOption = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY;
+        flinkOnYarnConfigMap.put(jmHeapSizeOption.key(), "-yjm");
+        if (jmHeapSizeOption.hasDeprecatedKeys()) {
+            Iterator<String> deprecatedKeyIterator = jmHeapSizeOption.deprecatedKeys().iterator();
+            while (deprecatedKeyIterator.hasNext()) {
+                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-yjm");
+            }
+        }
+
+        //mapping task manager heap size -> -ytm
+        ConfigOption<String> tmHeapSizeOption = TaskManagerOptions.MANAGED_MEMORY_SIZE;
+        flinkOnYarnConfigMap.put(tmHeapSizeOption.key(), "-ytm");
+        if (tmHeapSizeOption.hasDeprecatedKeys()) {
+            Iterator<String> deprecatedKeyIterator = tmHeapSizeOption.deprecatedKeys().iterator();
+            while (deprecatedKeyIterator.hasNext()) {
+                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ytm");
+            }
+        }
+
+        ConfigOption<Integer> taskSlotNumOption = TaskManagerOptions.NUM_TASK_SLOTS;
+        flinkOnYarnConfigMap.put(taskSlotNumOption.key(), "-ys");
+        if (taskSlotNumOption.hasDeprecatedKeys()) {
+            Iterator<String> deprecatedKeyIterator = taskSlotNumOption.deprecatedKeys().iterator();
+            while (deprecatedKeyIterator.hasNext()) {
+                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ys");
+            }
+        }
+    }
+
+}