You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/18 03:12:16 UTC
[kylin] branch engine-flink updated: KYLIN-3869 Implement Flink
batch merge job builder
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push:
new e9b520b KYLIN-3869 Implement Flink batch merge job builder
e9b520b is described below
commit e9b520b3c8c89419f757a2b7fcf6a34a66ae4acb
Author: yanghua <ya...@gmail.com>
AuthorDate: Sat Mar 16 20:10:12 2019 +0800
KYLIN-3869 Implement Flink batch merge job builder
---
.../engine/flink/FlinkBatchCubingEngine2.java | 68 +++++++++++
.../engine/flink/FlinkBatchMergeJobBuilder2.java | 134 +++++++++++++++++++++
2 files changed, 202 insertions(+)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java
new file mode 100644
index 0000000..abd57b0
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.engine.mr.BatchOptimizeJobBuilder2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+
+/**
+ * The implementation of flink batch cubing engine.
+ */
+public class FlinkBatchCubingEngine2 implements IBatchCubingEngine {
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+ return new CubeJoinedFlatTableDesc(cubeDesc);
+ }
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+ return new CubeJoinedFlatTableDesc(newSegment);
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return new FlinkBatchCubingJobBuilder2(newSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return new FlinkBatchMergeJobBuilder2(mergeSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+ return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+ }
+
+ @Override
+ public Class<?> getSourceInterface() {
+ return IFlinkInput.class;
+ }
+
+ @Override
+ public Class<?> getStorageInterface() {
+ return IFlinkOutput.class;
+ }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
new file mode 100644
index 0000000..155ef47
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * The batch merge job build implements with Apache Flink.
+ */
+public class FlinkBatchMergeJobBuilder2 extends JobBuilderSupport {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlinkBatchMergeJobBuilder2.class);
+
+ private final IFlinkOutput.IFlinkBatchMergeOutputSide outputSide;
+ private final IFlinkInput.IFlinkBatchMergeInputSide inputSide;
+
+ public FlinkBatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+ super(mergeSegment, submitter);
+ this.outputSide = FlinkUtil.getBatchMergeOutputSide2(seg);
+ this.inputSide = FlinkUtil.getBatchMergeInputSide(seg);
+ }
+
+ public CubingJob build() {
+ logger.info("Flink job to MERGE segment " + seg);
+
+ final CubeSegment cubeSegment = seg;
+ final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
+ final String jobId = result.getId();
+
+ final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
+ Preconditions.checkState(mergingSegments.size() > 1,
+ "there should be more than 2 segments to merge, target segment " + cubeSegment);
+ final List<String> mergingSegmentIds = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingSegmentIds.add(merging.getUuid());
+ }
+
+ // Phase 1: Merge Dictionary
+ inputSide.addStepPhase1_MergeDictionary(result);
+ result.addTask(createMergeDictionaryMRStep(cubeSegment, jobId, mergingSegmentIds));
+ result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
+ outputSide.addStepPhase1_MergeDictionary(result);
+
+ // merge cube
+ result.addTask(createMergeCuboidDataFlinkStep(cubeSegment, mergingSegments, jobId));
+
+ // Phase 2: Merge Cube Files
+ outputSide.addStepPhase2_BuildCube(seg, mergingSegments, result);
+
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+ outputSide.addStepPhase3_Cleanup(result);
+
+ return result;
+ }
+
+ public MapReduceExecutable createMergeDictionaryMRStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
+ MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
+ mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX);
+
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
+ appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
+ appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
+ appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Dictionary_" + seg.getCubeInstance().getName() + "_Step");
+
+ mergeDictionaryStep.setMapReduceParams(cmd.toString());
+ mergeDictionaryStep.setMapReduceJobClass(MergeDictionaryJob.class);
+
+ return mergeDictionaryStep;
+ }
+
+ public FlinkExecutable createMergeCuboidDataFlinkStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID) {
+ final List<String> mergingCuboidPaths = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingCuboidPaths.add(getCuboidRootPath(merging));
+ }
+ String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+ String outputPath = getCuboidRootPath(jobID);
+
+ final FlinkExecutable flinkExecutable = new FlinkExecutable();
+ flinkExecutable.setClassName(FlinkCubingMerge.class.getName());
+ flinkExecutable.setParam(FlinkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ flinkExecutable.setParam(FlinkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ flinkExecutable.setParam(FlinkCubingMerge.OPTION_INPUT_PATH.getOpt(), formattedPath);
+ flinkExecutable.setParam(FlinkCubingMerge.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobID));
+ flinkExecutable.setParam(FlinkCubingMerge.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+
+ flinkExecutable.setJobId(jobID);
+ flinkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getFlinkAdditionalJars());
+ flinkExecutable.setJars(jars.toString());
+
+ return flinkExecutable;
+ }
+
+}