You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:26 UTC
[kylin] 13/30: KYLIN-3851 Flink cubing step : merge dictionary
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c54d455bd432f531d3b2350bd859b5d55ea05d01
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Mar 20 20:39:29 2019 +0800
KYLIN-3851 Flink cubing step : merge dictionary
---
.../engine/flink/FlinkBatchMergeJobBuilder2.java | 46 ++--
.../kylin/engine/flink/FlinkMergingDictionary.java | 297 +++++++++++++++++++++
.../engine/mr/steps/UpdateDictionaryStep.java | 2 +-
3 files changed, 320 insertions(+), 25 deletions(-)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
index 155ef47..8a5635f 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
@@ -23,11 +23,7 @@ import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +62,7 @@ public class FlinkBatchMergeJobBuilder2 extends JobBuilderSupport {
// Phase 1: Merge Dictionary
inputSide.addStepPhase1_MergeDictionary(result);
- result.addTask(createMergeDictionaryMRStep(cubeSegment, jobId, mergingSegmentIds));
+ result.addTask(createMergeDictionaryFlinkStep(cubeSegment, jobId, mergingSegmentIds));
result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
outputSide.addStepPhase1_MergeDictionary(result);
@@ -83,25 +79,27 @@ public class FlinkBatchMergeJobBuilder2 extends JobBuilderSupport {
return result;
}
- public MapReduceExecutable createMergeDictionaryMRStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
- MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
- mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX);
-
- appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
- appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
- appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
- appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
- appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
- appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
- appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Dictionary_" + seg.getCubeInstance().getName() + "_Step");
-
- mergeDictionaryStep.setMapReduceParams(cmd.toString());
- mergeDictionaryStep.setMapReduceJobClass(MergeDictionaryJob.class);
-
- return mergeDictionaryStep;
+ public FlinkExecutable createMergeDictionaryFlinkStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
+ final FlinkExecutable flinkExecutable = new FlinkExecutable();
+ flinkExecutable.setClassName(FlinkMergingDictionary.class.getName());
+
+ flinkExecutable.setParam(FlinkMergingDictionary.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ flinkExecutable.setParam(FlinkMergingDictionary.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ flinkExecutable.setParam(FlinkMergingDictionary.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobID));
+ flinkExecutable.setParam(FlinkMergingDictionary.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
+ flinkExecutable.setParam(FlinkMergingDictionary.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
+ flinkExecutable.setParam(FlinkMergingDictionary.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
+
+ flinkExecutable.setJobId(jobID);
+ flinkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+ flinkExecutable.setFlinkConfigName(ExecutableConstants.FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getFlinkAdditionalJars());
+ flinkExecutable.setJars(jars.toString());
+
+ return flinkExecutable;
}
public FlinkExecutable createMergeCuboidDataFlinkStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID) {
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
new file mode 100644
index 0000000..3839535
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flink Cubing merge step : merge dictionary.
+ */
+public class FlinkMergingDictionary extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(FlinkMergingDictionary.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg()
+ .isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds");
+ public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg()
+ .isRequired(true).withDescription("merged dictionary resource path").create("dictOutputPath");
+ public static final Option OPTION_OUTPUT_PATH_STAT = OptionBuilder.withArgName("statOutputPath").hasArg()
+ .isRequired(true).withDescription("merged statistics resource path").create("statOutputPath");
+
+ private Options options;
+
+ public FlinkMergingDictionary() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_MERGE_SEGMENT_IDS);
+ options.addOption(OPTION_OUTPUT_PATH_DICT);
+ options.addOption(OPTION_OUTPUT_PATH_STAT);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+ final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
+ final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+ final Job job = Job.getInstance();
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+ final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+
+ logger.info("Dictionary output path: {}", dictOutputPath);
+ logger.info("Statistics output path: {}", statOutputPath);
+
+ final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+ final int columnLength = tblColRefs.length;
+
+ List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+
+ for (int i = 0; i <= columnLength; i++) {
+ indexs.add(i);
+ }
+
+ DataSource<Integer> indexDS = env.fromCollection(indexs);
+
+ DataSet<Tuple2<Text, Text>> colToDictPathDS = indexDS.map(new MergeDictAndStatsFunction(cubeName,
+ metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf))
+ .setParallelism(columnLength + 1);
+
+ FlinkUtil.setHadoopConfForCuboid(job, null, null);
+ HadoopOutputFormat<Text, Text> hadoopOF =
+ new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
+ SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));
+
+ colToDictPathDS.output(hadoopOF).setParallelism(1);
+
+ env.execute("Merge dictionary for cube:" + cubeName + ", segment " + segmentId);
+ }
+
+ public static class MergeDictAndStatsFunction extends RichMapFunction<Integer, Tuple2<Text, Text>> {
+
+ private String cubeName;
+ private String metaUrl;
+ private String segmentId;
+ private String[] segmentIds;
+ private String statOutputPath;
+ private TblColRef[] tblColRefs;
+ private SerializableConfiguration conf;
+ private transient DictionaryManager dictMgr;
+ private KylinConfig kylinConfig;
+ private List<CubeSegment> mergingSegments;
+
+ public MergeDictAndStatsFunction(String cubeName, String metaUrl, String segmentId, String[] segmentIds,
+ String statOutputPath, TblColRef[] tblColRefs, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaUrl;
+ this.segmentId = segmentId;
+ this.segmentIds = segmentIds;
+ this.statOutputPath = statOutputPath;
+ this.tblColRefs = tblColRefs;
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
+ kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ dictMgr = DictionaryManager.getInstance(kylinConfig);
+ mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+ }
+ }
+
+ @Override
+ public Tuple2<Text, Text> map(Integer index) throws Exception {
+ if (index < tblColRefs.length) {
+ // merge dictionary
+ TblColRef col = tblColRefs[index];
+ List<DictionaryInfo> dictInfos = Lists.newArrayList();
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ for (CubeSegment segment : mergingSegments) {
+ if (segment.getDictResPath(col) != null) {
+ DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+ if (dictInfo != null && !dictInfos.contains(dictInfo)) {
+ dictInfos.add(dictInfo);
+ }
+ }
+ }
+
+ DictionaryInfo mergedDictInfo = dictMgr.mergeDictionary(dictInfos);
+ String tblCol = col.getTableAlias() + ":" + col.getName();
+ String dictInfoPath = mergedDictInfo == null ? "" : mergedDictInfo.getResourcePath();
+
+ return new Tuple2<>(new Text(tblCol), new Text(dictInfoPath));
+ }
+ } else {
+ // merge statistics
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ CubeSegment newSegment = cubeInstance.getSegmentById(segmentId);
+ ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+ Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+ Configuration conf = null;
+ int averageSamplingPercentage = 0;
+
+ for (CubeSegment cubeSegment : mergingSegments) {
+ String filePath = cubeSegment.getStatisticsResourcePath();
+
+ File tempFile = File.createTempFile(segmentId, ".seq");
+
+ try(InputStream is = rs.getResource(filePath).content();
+ FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
+
+ org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+ }
+
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+
+ conf = HadoopUtil.getCurrentConfiguration();
+
+ try(SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf)) {
+ //noinspection deprecation
+ LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ while (reader.next(key, value)) {
+ if (key.get() == 0L) {
+ // sampling percentage
+ averageSamplingPercentage += Bytes.toInt(value.getBytes());
+ } else if (key.get() > 0) {
+ HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ if (cuboidHLLMap.get(key.get()) != null) {
+ cuboidHLLMap.get(key.get()).merge(hll);
+ } else {
+ cuboidHLLMap.put(key.get(), hll);
+ }
+ }
+ }
+ }
+ }
+
+ averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+ Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+ FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+ FSDataInputStream fis = fs.open(statisticsFilePath);
+
+ try {
+ // put the statistics to metadata store
+ String statisticsFileName = newSegment.getStatisticsResourcePath();
+ rs.putResource(statisticsFileName, fis, System.currentTimeMillis());
+ } finally {
+ IOUtils.closeStream(fis);
+ }
+
+ return new Tuple2<>(new Text(""), new Text(""));
+ }
+ }
+ }
+
+ private List<CubeSegment> getMergingSegments(CubeInstance cube, String[] segmentIds) {
+ List<CubeSegment> result = Lists.newArrayListWithCapacity(segmentIds.length);
+ for (String id : segmentIds) {
+ result.add(cube.getSegmentById(id));
+ }
+ return result;
+ }
+ }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
index a68f215..43b537a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
@@ -89,7 +89,7 @@ public class UpdateDictionaryStep extends AbstractExecutable {
FileStatus[] fileStatuss = fs.listStatus(new Path(dictInfoPath), new PathFilter() {
@Override
public boolean accept(Path path) {
- return path.getName().startsWith("part");
+ return path.getName().startsWith("part") || path.getName().startsWith("tmp");
}
});