You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/21 00:39:43 UTC

[kylin] branch master updated: KYLIN-3441 Merge segments in spark

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new c9d9b2e  KYLIN-3441 Merge segments in spark
c9d9b2e is described below

commit c9d9b2ed3e2f1c0fe9bf6c94ad87ed2a80758a18
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jul 17 19:17:40 2018 +0800

    KYLIN-3441 Merge segments in spark
---
 .../kylin/engine/spark/SparkCubingByLayer.java     |   2 +-
 .../kylin/engine/spark/SparkCubingMerge.java       | 145 +++++++++++++--------
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  21 ++-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  |   9 +-
 4 files changed, 110 insertions(+), 67 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 0cc8a3b..e95e433 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -232,11 +232,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
             allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
                     .reduceByKey(reducerFunction2, partition).persist(storageLevel);
+            allRDDs[level - 1].unpersist();
             if (envConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
             }
             saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig);
-            allRDDs[level - 1].unpersist();
         }
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 8d78920..7fd4123 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -42,6 +42,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -81,6 +82,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
 
     private Options options;
 
+    private String cubeName;
+    private String metaUrl;
+
     public SparkCubingMerge() {
         options = new Options();
         options.addOption(OPTION_META_URL);
@@ -97,9 +101,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
 
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
-        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        this.metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        this.cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
-        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
 
@@ -127,76 +131,107 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
         logger.info("Input path: {}", inputPath);
         logger.info("Output path: {}", outputPath);
 
-        String[] inputFolders = StringSplitter.split(inputPath, ",");
-
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
-        for (int i = 0; i < inputFolders.length; i++) {
-            String path = inputFolders[i];
-            CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
-
-            // each folder will be a rdd, union them
-            List<JavaPairRDD> cuboidRdds = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
-            JavaPairRDD<Text, Text> segRdd = sc.union(cuboidRdds.toArray(new JavaPairRDD[cuboidRdds.size()]));
-
-            // re-encode with new dictionaries
-            JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
-                    sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
-            mergingSegs.add(newEcoddedRdd);
-        }
-
-        final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
-
-        // reduce
-        JavaPairRDD<Text, Object[]> mergedRdd = sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
-                .reduceByKey(new Function2<Object[], Object[], Object[]>() {
-                    @Override
-                    public Object[] call(Object[] input1, Object[] input2) throws Exception {
-                        Object[] measureObjs = new Object[input1.length];
-                        aggregators.aggregate(input1, input2, measureObjs);
-                        return measureObjs;
-                    }
-                }, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig));
-
         Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
         confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
         final Job job = Job.getInstance(confOverwrite);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);
-        FileOutputFormat.setOutputPath(job, new Path(outputPath));
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
 
-        mergedRdd.mapToPair(
-                new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
-                    private volatile transient boolean initialized = false;
-                    BufferedMeasureCodec codec;
+        final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+        final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
+            @Override
+            public Object[] call(Object[] input1, Object[] input2) throws Exception {
+                Object[] measureObjs = new Object[input1.length];
+                aggregators.aggregate(input1, input2, measureObjs);
+                return measureObjs;
+            }
+        };
+
+        final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+            private volatile transient boolean initialized = false;
+            BufferedMeasureCodec codec;
 
-                    @Override
-                    public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(
-                            Tuple2<Text, Object[]> tuple2) throws Exception {
+            @Override
+            public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
+                    throws Exception {
 
+                if (initialized == false) {
+                    synchronized (SparkCubingByLayer.class) {
                         if (initialized == false) {
-                            synchronized (SparkCubingByLayer.class) {
-                                if (initialized == false) {
-                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-                                    CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
-                                    codec = new BufferedMeasureCodec(desc.getMeasures());
-                                    initialized = true;
-                                }
-                            }
+                            KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                            CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                            codec = new BufferedMeasureCodec(desc.getMeasures());
+                            initialized = true;
                         }
-                        ByteBuffer valueBuf = codec.encode(tuple2._2());
-                        byte[] encodedBytes = new byte[valueBuf.position()];
-                        System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
-                        return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
                     }
+                }
+                ByteBuffer valueBuf = codec.encode(tuple2._2());
+                byte[] encodedBytes = new byte[valueBuf.position()];
+                System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+                return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
+            }
+        };
+
+        final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+        final String[] inputFolders = StringSplitter.split(inputPath, ",");
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        boolean isLegacyMode = false;
+        for (String inputFolder : inputFolders) {
+            Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+            if (fs.exists(baseCuboidPath) == false) {
+                // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
+                isLegacyMode = true;
+                break;
+            }
+        }
 
-                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+        if (isLegacyMode == true) {
+            // merge all layer's cuboid at once, this might be hard for Spark
+            List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
+            for (int i = 0; i < inputFolders.length; i++) {
+                String path = inputFolders[i];
+                JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
+                CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+                // re-encode with new dictionaries
+                JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                        sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+                mergingSegs.add(newEcoddedRdd);
+            }
+
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+                    .reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
+                    .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+        } else {
+            // merge by layer
+            for (int level = 0; level <= totalLevels; level++) {
+                List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+                for (int i = 0; i < inputFolders.length; i++) {
+                    String path = inputFolders[i];
+                    CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+                    final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+                    JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
+                    // re-encode with new dictionaries
+                    JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                            sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+                    mergingSegs.add(newEcoddedRdd);
+                }
 
+                final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+                FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
+
+                sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+                        .reduceByKey(reduceFunction,
+                                SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
+                        .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+            }
+        }
         // output the data size to console, job engine will parse and save the metric
         // please note: this mechanism won't work when spark.submit.deployMode=cluster
         logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
-        HadoopUtil.deleteHDFSMeta(metaUrl);
+        //HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
     static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index da207ee..e7c6ee6 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMROutput2;
@@ -60,24 +61,34 @@ public class SparkUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
     }
 
-    public static List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc, Class keyClass,
+    /**
+     * Read the given path as a Java RDD; The path can have second level sub folder.
+     * @param inputPath
+     * @param fs
+     * @param sc
+     * @param keyClass
+     * @param valueClass
+     * @return
+     * @throws IOException
+     */
+    public static JavaPairRDD parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc, Class keyClass,
             Class valueClass) throws IOException {
-        List<JavaPairRDD> inputRDDs = Lists.newArrayList();
+        List<String> inputFolders = Lists.newArrayList();
         Path inputHDFSPath = new Path(inputPath);
         FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
         boolean hasDir = false;
         for (FileStatus stat : fileStatuses) {
             if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
                 hasDir = true;
-                inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), keyClass, valueClass));
+                inputFolders.add(stat.getPath().toString());
             }
         }
 
         if (!hasDir) {
-            inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), keyClass, valueClass));
+            return sc.sequenceFile(inputHDFSPath.toString(), keyClass, valueClass);
         }
 
-        return inputRDDs;
+        return sc.sequenceFile(StringUtil.join(inputFolders, ","), keyClass, valueClass);
     }
 
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index 0136d4e..c435d9d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -159,11 +159,10 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         logger.info("Input path: {}", inputPath);
         logger.info("Output path: {}", outputPath);
 
-        List<JavaPairRDD> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
-        final JavaPairRDD<Text, Text> allCuboidFile = sc.union(inputRDDs.toArray(new JavaPairRDD[inputRDDs.size()]));
+        JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
         final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
         if (quickPath) {
-            hfilerdd = allCuboidFile.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+            hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
                 @Override
                 public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
                     KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
@@ -172,7 +171,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
                 }
             });
         } else {
-            hfilerdd = allCuboidFile
+            hfilerdd = inputRDDs
                     .flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
                         @Override
                         public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
@@ -195,8 +194,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
                     });
         }
 
-        allCuboidFile.unpersist();
-
         // read partition split keys
         List<RowKeyWritable> keys = new ArrayList<>();
         try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {