You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/21 00:39:43 UTC
[kylin] branch master updated: KYLIN-3441 Merge segments in spark
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new c9d9b2e KYLIN-3441 Merge segments in spark
c9d9b2e is described below
commit c9d9b2ed3e2f1c0fe9bf6c94ad87ed2a80758a18
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jul 17 19:17:40 2018 +0800
KYLIN-3441 Merge segments in spark
---
.../kylin/engine/spark/SparkCubingByLayer.java | 2 +-
.../kylin/engine/spark/SparkCubingMerge.java | 145 +++++++++++++--------
.../org/apache/kylin/engine/spark/SparkUtil.java | 21 ++-
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 9 +-
4 files changed, 110 insertions(+), 67 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 0cc8a3b..e95e433 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -232,11 +232,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
.reduceByKey(reducerFunction2, partition).persist(storageLevel);
+ allRDDs[level - 1].unpersist();
if (envConfig.isSparkSanityCheckEnabled() == true) {
sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
}
saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig);
- allRDDs[level - 1].unpersist();
}
allRDDs[totalLevels].unpersist();
logger.info("Finished on calculating all level cuboids.");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 8d78920..7fd4123 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -42,6 +42,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -81,6 +82,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
private Options options;
+ private String cubeName;
+ private String metaUrl;
+
public SparkCubingMerge() {
options = new Options();
options.addOption(OPTION_META_URL);
@@ -97,9 +101,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
- final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ this.metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ this.cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
- final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
@@ -127,76 +131,107 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
logger.info("Input path: {}", inputPath);
logger.info("Output path: {}", outputPath);
- String[] inputFolders = StringSplitter.split(inputPath, ",");
-
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
- for (int i = 0; i < inputFolders.length; i++) {
- String path = inputFolders[i];
- CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
-
- // each folder will be a rdd, union them
- List<JavaPairRDD> cuboidRdds = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
- JavaPairRDD<Text, Text> segRdd = sc.union(cuboidRdds.toArray(new JavaPairRDD[cuboidRdds.size()]));
-
- // re-encode with new dictionaries
- JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
- sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
- mergingSegs.add(newEcoddedRdd);
- }
-
- final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
-
- // reduce
- JavaPairRDD<Text, Object[]> mergedRdd = sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
- .reduceByKey(new Function2<Object[], Object[], Object[]>() {
- @Override
- public Object[] call(Object[] input1, Object[] input2) throws Exception {
- Object[] measureObjs = new Object[input1.length];
- aggregators.aggregate(input1, input2, measureObjs);
- return measureObjs;
- }
- }, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig));
-
Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
final Job job = Job.getInstance(confOverwrite);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
- mergedRdd.mapToPair(
- new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
- private volatile transient boolean initialized = false;
- BufferedMeasureCodec codec;
+ final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+ final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
+ @Override
+ public Object[] call(Object[] input1, Object[] input2) throws Exception {
+ Object[] measureObjs = new Object[input1.length];
+ aggregators.aggregate(input1, input2, measureObjs);
+ return measureObjs;
+ }
+ };
+
+ final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+ private volatile transient boolean initialized = false;
+ BufferedMeasureCodec codec;
- @Override
- public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(
- Tuple2<Text, Object[]> tuple2) throws Exception {
+ @Override
+ public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
+ throws Exception {
+ if (initialized == false) {
+ synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
- synchronized (SparkCubingByLayer.class) {
- if (initialized == false) {
- KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
- codec = new BufferedMeasureCodec(desc.getMeasures());
- initialized = true;
- }
- }
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new BufferedMeasureCodec(desc.getMeasures());
+ initialized = true;
}
- ByteBuffer valueBuf = codec.encode(tuple2._2());
- byte[] encodedBytes = new byte[valueBuf.position()];
- System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
- return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
}
+ }
+ ByteBuffer valueBuf = codec.encode(tuple2._2());
+ byte[] encodedBytes = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+ return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
+ }
+ };
+
+ final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+ final String[] inputFolders = StringSplitter.split(inputPath, ",");
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ boolean isLegacyMode = false;
+ for (String inputFolder : inputFolders) {
+ Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+ if (fs.exists(baseCuboidPath) == false) {
+ // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
+ isLegacyMode = true;
+ break;
+ }
+ }
- }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ if (isLegacyMode == true) {
+ // merge all layer's cuboid at once, this might be hard for Spark
+ List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
+ for (int i = 0; i < inputFolders.length; i++) {
+ String path = inputFolders[i];
+ JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
+ CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+ // re-encode with new dictionaries
+ JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+ sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+ mergingSegs.add(newEcoddedRdd);
+ }
+
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
+ sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+ .reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
+ .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ } else {
+ // merge by layer
+ for (int level = 0; level <= totalLevels; level++) {
+ List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+ for (int i = 0; i < inputFolders.length; i++) {
+ String path = inputFolders[i];
+ CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+ final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+ JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
+ // re-encode with new dictionaries
+ JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+ sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+ mergingSegs.add(newEcoddedRdd);
+ }
+ final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+ FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
+
+ sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+ .reduceByKey(reduceFunction,
+ SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
+ .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ }
+ }
// output the data size to console, job engine will parse and save the metric
// please note: this mechanism won't work when spark.submit.deployMode=cluster
logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
- HadoopUtil.deleteHDFSMeta(metaUrl);
+ //HadoopUtil.deleteHDFSMeta(metaUrl);
}
static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index da207ee..e7c6ee6 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMROutput2;
@@ -60,24 +61,34 @@ public class SparkUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
}
- public static List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc, Class keyClass,
+ /**
+ * Read the given path as a Java RDD; The path can have second level sub folder.
+ * @param inputPath
+ * @param fs
+ * @param sc
+ * @param keyClass
+ * @param valueClass
+ * @return
+ * @throws IOException
+ */
+ public static JavaPairRDD parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc, Class keyClass,
Class valueClass) throws IOException {
- List<JavaPairRDD> inputRDDs = Lists.newArrayList();
+ List<String> inputFolders = Lists.newArrayList();
Path inputHDFSPath = new Path(inputPath);
FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
boolean hasDir = false;
for (FileStatus stat : fileStatuses) {
if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
hasDir = true;
- inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), keyClass, valueClass));
+ inputFolders.add(stat.getPath().toString());
}
}
if (!hasDir) {
- inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), keyClass, valueClass));
+ return sc.sequenceFile(inputHDFSPath.toString(), keyClass, valueClass);
}
- return inputRDDs;
+ return sc.sequenceFile(StringUtil.join(inputFolders, ","), keyClass, valueClass);
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index 0136d4e..c435d9d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -159,11 +159,10 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
logger.info("Input path: {}", inputPath);
logger.info("Output path: {}", outputPath);
- List<JavaPairRDD> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
- final JavaPairRDD<Text, Text> allCuboidFile = sc.union(inputRDDs.toArray(new JavaPairRDD[inputRDDs.size()]));
+ JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
if (quickPath) {
- hfilerdd = allCuboidFile.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
@Override
public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
@@ -172,7 +171,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
}
});
} else {
- hfilerdd = allCuboidFile
+ hfilerdd = inputRDDs
.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
@Override
public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
@@ -195,8 +194,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
});
}
- allCuboidFile.unpersist();
-
// read partition split keys
List<RowKeyWritable> keys = new ArrayList<>();
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {