You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/07 13:52:46 UTC
[kylin] 02/02: KYLIN-3471 Merge dictionary using Spark
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 74ac1cd1d2aa45028f38ccac47a5920343756313
Author: chao long <wa...@qq.com>
AuthorDate: Tue Aug 7 20:09:44 2018 +0800
KYLIN-3471 Merge dictionary using Spark
---
.../engine/spark/SparkBatchMergeJobBuilder2.java | 39 ++-
.../apache/kylin/engine/spark/SparkExecutable.java | 7 +-
.../kylin/engine/spark/SparkMergingDictionary.java | 307 +++++++++++++++++++++
3 files changed, 338 insertions(+), 15 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index 97861a3..4487610 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -18,12 +18,8 @@
package org.apache.kylin.engine.spark;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
@@ -63,8 +59,8 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
// Phase 1: Merge Dictionary
inputSide.addStepPhase1_MergeDictionary(result);
- result.addTask(createMergeDictionaryStep(mergingSegmentIds));
- result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
+ result.addTask(createMergeDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
+ result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
outputSide.addStepPhase1_MergeDictionary(result);
// merge cube
@@ -80,6 +76,28 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
return result;
}
+ public SparkExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
+ final SparkExecutable sparkExecutable = new SparkExecutable();
+ sparkExecutable.setClassName(SparkMergingDictionary.class.getName());
+
+ sparkExecutable.setParam(SparkMergingDictionary.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ sparkExecutable.setParam(SparkMergingDictionary.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkMergingDictionary.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobID));
+ sparkExecutable.setParam(SparkMergingDictionary.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
+ sparkExecutable.setParam(SparkMergingDictionary.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
+ sparkExecutable.setParam(SparkMergingDictionary.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
+
+ sparkExecutable.setJobId(jobID);
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+ sparkExecutable.setJars(jars.toString());
+
+ return sparkExecutable;
+ }
+
public SparkExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID) {
final List<String> mergingCuboidPaths = Lists.newArrayList();
@@ -94,8 +112,7 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
sparkExecutable.setParam(SparkCubingMerge.OPTION_INPUT_PATH.getOpt(), formattedPath);
- sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(),
- getSegmentMetadataUrl(seg.getConfig(), jobID));
+ sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobID));
sparkExecutable.setParam(SparkCubingMerge.OPTION_OUTPUT_PATH.getOpt(), outputPath);
sparkExecutable.setJobId(jobID);
@@ -108,10 +125,4 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
return sparkExecutable;
}
-
- public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
- Map<String, String> param = new HashMap<>();
- param.put("path", getDumpMetadataPath(jobId));
- return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
- }
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 4635fad..d8eba71 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.Pair;
@@ -386,9 +387,13 @@ public class SparkExecutable extends AbstractExecutable {
private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+ ResourceStore rs = ResourceStore.getStore(segments.get(0).getConfig());
for (CubeSegment segment : segments) {
dumpList.addAll(segment.getDictionaryPaths());
- dumpList.add(segment.getStatisticsResourcePath());
+ if (rs.exists(segment.getStatisticsResourcePath())) {
+ // cube statistics is not available for new segment
+ dumpList.add(segment.getStatisticsResourcePath());
+ }
}
dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig());
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
new file mode 100644
index 0000000..deb7968
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import scala.Tuple2;
+
+/**
+ merge dictionary
+ */
+public class SparkMergingDictionary extends AbstractApplication implements Serializable {
+ protected static final Logger logger = LoggerFactory.getLogger(SparkMergingDictionary.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg()
+ .isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds");
+ public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg()
+ .isRequired(true).withDescription("merged dictionary resource path").create("dictOutputPath");
+ public static final Option OPTION_OUTPUT_PATH_STAT = OptionBuilder.withArgName("statOutputPath").hasArg()
+ .isRequired(true).withDescription("merged statistics resource path").create("statOutputPath");
+
+ private Options options;
+
+ public SparkMergingDictionary() {
+ options = new Options();
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_MERGE_SEGMENT_IDS);
+ options.addOption(OPTION_OUTPUT_PATH_DICT);
+ options.addOption(OPTION_OUTPUT_PATH_STAT);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+ final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
+ final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+ Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"),
+ Class.forName("scala.collection.mutable.WrappedArray$ofRef") };
+
+ SparkConf conf = new SparkConf().setAppName("Merge dictionary for cube:" + cubeName + ", segment " + segmentId);
+ //serialization conf
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+ conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ sc.sc().addSparkListener(jobListener);
+
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+
+ logger.info("Dictionary output path: {}", dictOutputPath);
+ logger.info("Statistics output path: {}", statOutputPath);
+
+ final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+ final int columnLength = tblColRefs.length;
+
+ List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+
+ for (int i = 0; i <= columnLength; i++) {
+ indexs.add(i);
+ }
+
+ JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+
+ JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName, metaUrl,
+ segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
+
+ colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+ }
+
+ static public class MergeDictAndStatsFunction implements PairFunction<Integer, Text, Text> {
+ private volatile transient boolean initialized = false;
+ private String cubeName;
+ private String metaUrl;
+ private String segmentId;
+ private String[] segmentIds;
+ private String statOutputPath;
+ private TblColRef[] tblColRefs;
+ private SerializableConfiguration conf;
+ private DictionaryManager dictMgr;
+ private KylinConfig kylinConfig;
+ private List<CubeSegment> mergingSegments;
+
+ public MergeDictAndStatsFunction(String cubeName, String metaUrl, String segmentId, String[] segmentIds,
+ String statOutputPath, TblColRef[] tblColRefs, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaUrl;
+ this.segmentId = segmentId;
+ this.segmentIds = segmentIds;
+ this.statOutputPath = statOutputPath;
+ this.tblColRefs = tblColRefs;
+ this.conf = conf;
+ }
+
+ private void init() {
+ kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ dictMgr = DictionaryManager.getInstance(kylinConfig);
+ mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+ }
+
+ @Override
+ public Tuple2<Text, Text> call(Integer index) throws Exception {
+ if (initialized == false) {
+ synchronized (SparkMergingDictionary.class) {
+ if (initialized == false) {
+ init();
+ initialized = true;
+ }
+ }
+ }
+
+ if (index < tblColRefs.length) {
+ // merge dictionary
+ TblColRef col = tblColRefs[index];
+ List<DictionaryInfo> dictInfos = Lists.newArrayList();
+ for (CubeSegment segment : mergingSegments) {
+ if (segment.getDictResPath(col) != null) {
+ DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+ if (dictInfo != null && !dictInfos.contains(dictInfo)) {
+ dictInfos.add(dictInfo);
+ }
+ }
+ }
+
+ DictionaryInfo mergedDictInfo = dictMgr.mergeDictionary(dictInfos);
+ String tblCol = col.getTableAlias() + ":" + col.getName();
+ String dictInfoPath = mergedDictInfo == null ? "" : mergedDictInfo.getResourcePath();
+
+ return new Tuple2<>(new Text(tblCol), new Text(dictInfoPath));
+ } else {
+ // merge statistics
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ CubeSegment newSegment = cubeInstance.getSegmentById(segmentId);
+ ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+ Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+ Configuration conf = null;
+ int averageSamplingPercentage = 0;
+
+ for (CubeSegment cubeSegment : mergingSegments) {
+ String filePath = cubeSegment.getStatisticsResourcePath();
+ InputStream is = rs.getResource(filePath).inputStream;
+ File tempFile;
+ FileOutputStream tempFileStream = null;
+
+ try {
+ tempFile = File.createTempFile(segmentId, ".seq");
+ tempFileStream = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+ } finally {
+ IOUtils.closeStream(is);
+ IOUtils.closeStream(tempFileStream);
+ }
+
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+ SequenceFile.Reader reader = null;
+
+ try {
+ conf = HadoopUtil.getCurrentConfiguration();
+ //noinspection deprecation
+ reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+ LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ while (reader.next(key, value)) {
+ if (key.get() == 0L) {
+ // sampling percentage;
+ averageSamplingPercentage += Bytes.toInt(value.getBytes());
+ } else if (key.get() > 0) {
+ HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ if (cuboidHLLMap.get(key.get()) != null) {
+ cuboidHLLMap.get(key.get()).merge(hll);
+ } else {
+ cuboidHLLMap.put(key.get(), hll);
+ }
+ }
+ }
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+ Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+ FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+ FSDataInputStream fis = fs.open(statisticsFilePath);
+
+ try {
+ // put the statistics to metadata store
+ String statisticsFileName = newSegment.getStatisticsResourcePath();
+ rs.putResource(statisticsFileName, fis, System.currentTimeMillis());
+ } finally {
+ IOUtils.closeStream(fis);
+ }
+
+ return new Tuple2<>(new Text(""), new Text(""));
+ }
+
+ }
+
+ private List<CubeSegment> getMergingSegments(CubeInstance cube, String[] segmentIds) {
+ List<CubeSegment> result = Lists.newArrayListWithCapacity(segmentIds.length);
+ for (String id : segmentIds) {
+ result.add(cube.getSegmentById(id));
+ }
+ return result;
+ }
+ }
+
+}