You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/14 00:57:15 UTC
[kylin] branch engine-flink updated: KYLIN-3868 Implement Flink
batch cubing job builder
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push:
new 3e5eda7 KYLIN-3868 Implement Flink batch cubing job builder
3e5eda7 is described below
commit 3e5eda7acddbb9fa6bb22b5251cbffd553826742
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Mar 13 23:07:53 2019 +0800
KYLIN-3868 Implement Flink batch cubing job builder
---
.../kylin/job/constant/ExecutableConstants.java | 3 +
.../engine/flink/FlinkBatchCubingJobBuilder2.java | 125 ++++++
.../apache/kylin/engine/flink/FlinkExecutable.java | 441 +++++++++++++++++++++
.../engine/flink/FlinkOnYarnConfigMapping.java | 69 ++++
4 files changed, 638 insertions(+)
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 5735a80..99a5254 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -30,6 +30,7 @@ public final class ExecutableConstants {
public static final String YARN_APP_URL = "yarn_application_tracking_url";
public static final String MR_JOB_ID = "mr_job_id";
public static final String SPARK_JOB_ID = "spark_job_id";
+ public static final String FLINK_JOB_ID = "flink_job_id";
public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
public static final String SOURCE_RECORDS_COUNT = "source_records_count";
public static final String SOURCE_RECORDS_SIZE = "source_records_size";
@@ -47,6 +48,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid";
public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem";
public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark";
+ public static final String STEP_NAME_BUILD_FLINK_CUBE = "Build Cube with Flink";
public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid";
public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
@@ -73,4 +75,5 @@ public final class ExecutableConstants {
public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE = "Update Cube Info";
public static final String SPARK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
+ public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
new file mode 100644
index 0000000..ba475bf
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.LookupMaterializeContext;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The batch cubing job builder implements with Apache Flink.
+ */
+public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlinkBatchCubingJobBuilder2.class);
+
+ private final IFlinkInput.IFlinkBatchCubingInputSide inputSide;
+ private final IFlinkOutput.IFlinkBatchCubingOutputSide outputSide;
+
+ public FlinkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ this.inputSide = FlinkUtil.getBatchCubingInputSide(seg);
+ this.outputSide = FlinkUtil.getBatchCubingOutputSide(seg);
+ }
+
+ public CubingJob build() {
+ logger.info("Flink job to BUILD segment " + seg);
+
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
+
+ // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ // Phase 2: Build Dictionary
+ result.addTask(createFactDistinctColumnsStep(jobId));
+
+ if (isEnableUHCDictStep()) {
+ result.addTask(createBuildUHCDictStep(jobId));
+ }
+
+ result.addTask(createBuildDictionaryStep(jobId));
+ result.addTask(createSaveStatisticsStep(jobId));
+
+ // add materialize lookup tables if needed
+ LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
+
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 3: Build Cube
+ addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+ final FlinkExecutable flinkExecutable = new FlinkExecutable();
+ flinkExecutable.setClassName(FlinkCubingByLayer.class.getName());
+ configureFlinkJob(seg, flinkExecutable, jobId, cuboidRootPath);
+ result.addTask(flinkExecutable);
+ }
+
+ public void configureFlinkJob(final CubeSegment seg, final FlinkExecutable flinkExecutable,
+ final String jobId, final String cuboidRootPath) {
+ final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
+ flinkExecutable.setParam(FlinkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ flinkExecutable.setParam(FlinkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ flinkExecutable.setParam(FlinkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
+ seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+ flinkExecutable.setParam(FlinkCubingByLayer.OPTION_INPUT_PATH.getOpt(),
+ tablePath);
+ flinkExecutable.setParam(FlinkCubingByLayer.OPTION_META_URL.getOpt(),
+ getSegmentMetadataUrl(seg.getConfig(), jobId));
+ flinkExecutable.setParam(FlinkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
+ flinkExecutable.setJobId(jobId);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getFlinkAdditionalJars());
+ flinkExecutable.setJars(jars.toString());
+ flinkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_FLINK_CUBE);
+ }
+
+ public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+ Map<String, String> param = new HashMap<>();
+ param.put("path", getDumpMetadataPath(jobId));
+ return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
+ }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
new file mode 100644
index 0000000..83cc815
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.Segments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * The Flink executable contains job submission specific business logic.
+ */
+public class FlinkExecutable extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlinkExecutable.class);
+
+ private static final String CLASS_NAME = "className";
+ private static final String JARS = "jars";
+ private static final String JOB_ID = "jobId";
+ private static final String COUNTER_SAVE_AS = "CounterSaveAs";
+ private static final String CONFIG_NAME = "configName";
+
+ public void setClassName(String className) {
+ this.setParam(CLASS_NAME, className);
+ }
+
+ public void setJobId(String jobId) {
+ this.setParam(JOB_ID, jobId);
+ }
+
+ public void setJars(String jars) {
+ this.setParam(JARS, jars);
+ }
+
+ public void setCounterSaveAs(String value) {
+ this.setParam(COUNTER_SAVE_AS, value);
+ }
+
+ public void setCounterSaveAs(String value, String counterOutputPath) {
+ this.setParam(COUNTER_SAVE_AS, value);
+ this.setParam(BatchConstants.ARG_COUNTER_OUPUT, counterOutputPath);
+ }
+
+ public String getCounterSaveAs() {
+ return getParam(COUNTER_SAVE_AS);
+ }
+
+ public void setFlinkConfigName(String configName) {
+ this.setParam(CONFIG_NAME, configName);
+ }
+
+ public String getFlinkConfigName() {
+ return getParam(CONFIG_NAME);
+ }
+
+ private String formatArgs() {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (Map.Entry<String, String> entry : getParams().entrySet()) {
+ StringBuilder tmp = new StringBuilder();
+ tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
+ if (entry.getKey().equals(CLASS_NAME)) {
+ stringBuilder.insert(0, tmp);
+ } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)
+ || entry.getKey().equals(COUNTER_SAVE_AS) || entry.getKey().equals(CONFIG_NAME)) {
+ // JARS is for flink-submit, not for app
+ continue;
+ } else {
+ stringBuilder.append(tmp);
+ }
+ }
+ if (stringBuilder.length() > 0) {
+ return stringBuilder.substring(0, stringBuilder.length() - 1).toString();
+ } else {
+ return StringUtils.EMPTY;
+ }
+ }
+
+ @SuppressWarnings("checkstyle:methodlength")
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException {
+ ExecutableManager manager = getManager();
+ Map<String, String> extra = manager.getOutput(getId()).getExtra();
+ String flinkJobId = extra.get(ExecutableConstants.FLINK_JOB_ID);
+ if (!StringUtils.isEmpty(flinkJobId)) {
+ return onResumed(flinkJobId, manager);
+ } else {
+ String cubeName = this.getParam(FlinkCubingByLayer.OPTION_CUBE_NAME.getOpt());
+ CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
+
+ final KylinConfig config = cube.getConfig();
+
+ setAlgorithmLayer();
+
+ if (KylinConfig.getFlinkHome() == null) {
+ throw new NullPointerException();
+ }
+ if (config.getKylinJobJarPath() == null) {
+ throw new NullPointerException();
+ }
+
+ String jars = this.getParam(JARS);
+
+ String hadoopConf = null;
+ hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
+
+ if (StringUtils.isEmpty(hadoopConf)) {
+ throw new RuntimeException(
+ "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
+ }
+
+ logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+
+ String jobJar = config.getKylinJobJarPath();
+ if (StringUtils.isEmpty(jars)) {
+ jars = jobJar;
+ }
+
+ String segmentID = this.getParam(FlinkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+ CubeSegment segment = cube.getSegmentById(segmentID);
+ Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+ dumpMetadata(segment, mergingSeg);
+
+ StringBuilder sb = new StringBuilder();
+ if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
+ sb.append("set HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster ");
+ } else {
+ sb.append("export HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster ");
+ }
+
+ Map<String, String> flinkConfs = config.getFlinkConfigOverride();
+
+ String flinkConfigName = getFlinkConfigName();
+ if (flinkConfigName != null) {
+ Map<String, String> flinkSpecificConfs = config.getFlinkConfigOverrideWithSpecificName(flinkConfigName);
+ flinkSpecificConfs.putAll(flinkConfs);
+ }
+
+ for (Map.Entry<String, String> entry : flinkConfs.entrySet()) {
+ if (!FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.containsKey(entry.getKey())) {
+ logger.warn("Unsupported Flink configuration pair : key[%s], value[%s]", entry.getKey(), entry.getValue());
+ throw new IllegalArgumentException("Unsupported Flink configuration pair : key["
+ + entry.getKey() + "], value[" + entry.getValue() + "]");
+ }
+
+ String onYarnConfigOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey());
+
+ sb.append(" ").append(onYarnConfigOptionKey).append(" ").append(entry.getValue());
+ }
+
+ sb.append(" -c org.apache.kylin.common.util.FlinkEntry %s %s ");
+ final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf,
+ KylinConfig.getFlinkHome(), jars, formatArgs());
+ logger.info("cmd: " + cmd);
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final CliCommandExecutor exec = new CliCommandExecutor();
+ final PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
+ @Override
+ public void onLogEvent(String infoKey, Map<String, String> info) {
+ // only care three properties here
+ if (ExecutableConstants.FLINK_JOB_ID.equals(infoKey)
+ || ExecutableConstants.YARN_APP_ID.equals(infoKey)
+ || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
+ getManager().addJobInfo(getId(), info);
+ }
+ }
+ });
+
+ Callable callable = new Callable<Pair<Integer, String>>() {
+ @Override
+ public Pair<Integer, String> call() throws Exception {
+ Pair<Integer, String> result;
+ try {
+ result = exec.execute(cmd, patternedLogger);
+ } catch (Exception e) {
+ logger.error("error run Flink job:", e);
+ result = new Pair<>(-1, e.getMessage());
+ }
+ return result;
+ }
+ };
+
+ try {
+ Future<Pair<Integer, String>> future = executorService.submit(callable);
+ Pair<Integer, String> result = null;
+ while (!isDiscarded() && !isPaused()) {
+ if (future.isDone()) {
+ result = future.get();
+ break;
+ } else {
+ Thread.sleep(5000);
+ }
+ }
+
+ if (future.isDone() == false) { // user cancelled
+ executorService.shutdownNow(); // interrupt
+ extra = manager.getOutput(getId()).getExtra();
+ if (extra != null && extra.get(ExecutableConstants.FLINK_JOB_ID) != null) {
+ killAppRetry(extra.get(ExecutableConstants.FLINK_JOB_ID));
+ }
+
+ if (isDiscarded()) {
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
+ }
+ if (isPaused()) {
+ return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
+ }
+
+ throw new IllegalStateException();
+ }
+
+ if (result == null) {
+ result = future.get();
+ }
+ if (result != null && result.getFirst() == 0) {
+ // done, update all properties
+ Map<String, String> joblogInfo = patternedLogger.getInfo();
+ // read counter from hdfs
+ String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+ if (counterOutput != null) {
+ if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) {
+ Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
+ joblogInfo.putAll(counterMap);
+ } else {
+ logger.warn("Flink counter output path not exists: " + counterOutput);
+ }
+ }
+ readCounters(joblogInfo);
+ getManager().addJobInfo(getId(), joblogInfo);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
+ }
+ // clear FLINK_JOB_ID on job failure.
+ extra = manager.getOutput(getId()).getExtra();
+ extra.put(ExecutableConstants.FLINK_JOB_ID, "");
+ getManager().addJobInfo(getId(), extra);
+ return new ExecuteResult(ExecuteResult.State.ERROR, result != null ? result.getSecond() : "");
+ } catch (Exception e) {
+ logger.error("error run Flink job:", e);
+ return ExecuteResult.createError(e);
+ }
+ }
+ }
+
+ private ExecuteResult onResumed(String appId, ExecutableManager mgr) throws ExecuteException {
+ Map<String, String> info = new HashMap<>();
+ try {
+ logger.info("flink_job_id:" + appId + " resumed");
+ info.put(ExecutableConstants.FLINK_JOB_ID, appId);
+
+ while (!isPaused() && !isDiscarded()) {
+ String status = getAppState(appId);
+
+ if (status.equals("FAILED") || status.equals("KILLED")) {
+ mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, appId + " has failed");
+ return new ExecuteResult(ExecuteResult.State.FAILED, appId + " has failed");
+ }
+
+ if (status.equals("SUCCEEDED")) {
+ mgr.addJobInfo(getId(), info);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, appId + " has finished");
+ }
+
+ Thread.sleep(5000);
+
+ }
+
+ killAppRetry(appId);
+
+ if (isDiscarded()) {
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, appId + " is discarded");
+ } else {
+ return new ExecuteResult(ExecuteResult.State.STOPPED, appId + " is stopped");
+ }
+ } catch (Exception e) {
+ logger.error("error run spark job:", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+
+ private String getAppState(String appId) throws IOException {
+ CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+ PatternedLogger patternedLogger = new PatternedLogger(logger);
+ String stateCmd = String.format(Locale.ROOT, "yarn application -status %s", appId);
+ executor.execute(stateCmd, patternedLogger);
+ Map<String, String> info = patternedLogger.getInfo();
+ return info.get(ExecutableConstants.YARN_APP_STATE);
+ }
+
+ private void setAlgorithmLayer() {
+ ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubingJob cubingJob = (CubingJob) execMgr.getJob(this.getParam(JOB_ID));
+ cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
+ }
+
+ private void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException {
+ try {
+ if (mergingSeg == null || mergingSeg.size() == 0) {
+ attachSegmentMetadataWithDict(segment);
+ } else {
+ List<CubeSegment> allRelatedSegs = new ArrayList();
+ allRelatedSegs.add(segment);
+ allRelatedSegs.addAll(mergingSeg);
+ attachSegmentsMetadataWithDict(allRelatedSegs);
+ }
+ } catch (IOException e) {
+ throw new ExecuteException("meta dump failed");
+ }
+ }
+
+ private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException {
+ Set<String> dumpList = new LinkedHashSet<>();
+ dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
+ dumpList.addAll(segment.getDictionaryPaths());
+ ResourceStore rs = ResourceStore.getStore(segment.getConfig());
+ if (rs.exists(segment.getStatisticsResourcePath())) {
+ // cube statistics is not available for new segment
+ dumpList.add(segment.getStatisticsResourcePath());
+ }
+
+ JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(),
+ this.getParam(FlinkCubingByLayer.OPTION_META_URL.getOpt()));
+ }
+
+ private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
+ Set<String> dumpList = new LinkedHashSet<>();
+ dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+ ResourceStore rs = ResourceStore.getStore(segments.get(0).getConfig());
+ for (CubeSegment segment : segments) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ if (rs.exists(segment.getStatisticsResourcePath())) {
+ // cube statistics is not available for new segment
+ dumpList.add(segment.getStatisticsResourcePath());
+ }
+ }
+
+ JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(),
+ this.getParam(FlinkCubingByLayer.OPTION_META_URL.getOpt()));
+ }
+
+ private int killAppRetry(String appId) throws IOException, InterruptedException {
+ String state = getAppState(appId);
+ if ("SUCCEEDED".equals(state) || "FAILED".equals(state) || "KILLED".equals(state)) {
+ logger.warn(appId + "is final state, no need to kill");
+ return 0;
+ }
+
+ killApp(appId);
+
+ state = getAppState(appId);
+ int retry = 0;
+ while (state == null || !state.equals("KILLED") && retry < 5) {
+ killApp(appId);
+
+ Thread.sleep(1000);
+
+ state = getAppState(appId);
+ retry++;
+ }
+
+ if ("KILLED".equals(state)) {
+ logger.info(appId + " killed successfully");
+ return 0;
+ } else {
+ logger.info(appId + " killed failed");
+ return 1;
+ }
+ }
+
+ private void killApp(String appId) throws IOException, InterruptedException {
+ CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+ String killCmd = String.format(Locale.ROOT, "yarn application -kill %s", appId);
+ executor.execute(killCmd);
+ }
+
+ private void readCounters(final Map<String, String> info) {
+ String counterSaveAs = getCounterSaveAs();
+ if (counterSaveAs != null) {
+ String[] saveAsNames = counterSaveAs.split(",");
+ saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_COUNT), saveAsNames, 0, info);
+ saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_SIZE), saveAsNames, 1, info);
+ saveCounterAs(info.get(ExecutableConstants.HDFS_BYTES_WRITTEN), saveAsNames, 2, info);
+ }
+ }
+
+ private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
+ if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
+ info.put(saveAsNames[i].trim(), counter);
+ }
+ }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
new file mode 100644
index 0000000..105e9ec
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A util class which build mapping between Flink config option keys from flink-conf.yaml
+ * and Flink on YARN cmd option.
+ */
+public class FlinkOnYarnConfigMapping {
+
+ public static final Map<String, String> flinkOnYarnConfigMap;
+
+ static {
+ flinkOnYarnConfigMap = new HashMap<>();
+
+ //mapping job manager heap size -> -yjm
+ ConfigOption<String> jmHeapSizeOption = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY;
+ flinkOnYarnConfigMap.put(jmHeapSizeOption.key(), "-yjm");
+ if (jmHeapSizeOption.hasDeprecatedKeys()) {
+ Iterator<String> deprecatedKeyIterator = jmHeapSizeOption.deprecatedKeys().iterator();
+ while (deprecatedKeyIterator.hasNext()) {
+ flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-yjm");
+ }
+ }
+
+ //mapping task manager heap size -> -ytm
+ ConfigOption<String> tmHeapSizeOption = TaskManagerOptions.MANAGED_MEMORY_SIZE;
+ flinkOnYarnConfigMap.put(tmHeapSizeOption.key(), "-ytm");
+ if (tmHeapSizeOption.hasDeprecatedKeys()) {
+ Iterator<String> deprecatedKeyIterator = tmHeapSizeOption.deprecatedKeys().iterator();
+ while (deprecatedKeyIterator.hasNext()) {
+ flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ytm");
+ }
+ }
+
+ ConfigOption<Integer> taskSlotNumOption = TaskManagerOptions.NUM_TASK_SLOTS;
+ flinkOnYarnConfigMap.put(taskSlotNumOption.key(), "-ys");
+ if (taskSlotNumOption.hasDeprecatedKeys()) {
+ Iterator<String> deprecatedKeyIterator = taskSlotNumOption.deprecatedKeys().iterator();
+ while (deprecatedKeyIterator.hasNext()) {
+ flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ys");
+ }
+ }
+ }
+
+}