You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/16 00:32:21 UTC

[kylin] branch engine-flink updated: KYLIN-3850 Flink cubing step : merge by layer

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/engine-flink by this push:
     new 04394df  KYLIN-3850 Flink cubing step : merge by layer
04394df is described below

commit 04394df3b3a5bb664fbd66d323a7769d431692d8
Author: yanghua <ya...@gmail.com>
AuthorDate: Fri Mar 15 20:16:23 2019 +0800

    KYLIN-3850 Flink cubing step : merge by layer
---
 .../kylin/engine/flink/FlinkCubingMerge.java       | 309 +++++++++++++++++++++
 1 file changed, 309 insertions(+)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
new file mode 100644
index 0000000..c51d69c
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.HadoopInputs;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Flink application to merge cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
+ */
+public class FlinkCubingMerge extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(FlinkCubingMerge.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+
+    private Options options;
+
+    private String cubeName;
+    private String metaUrl;
+
+    public FlinkCubingMerge() {
+        options = new Options();
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_OUTPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        this.metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        this.cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+        Job job = Job.getInstance();
+        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress
+
+        HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
+        final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+        logger.info("Input path: {}", inputPath);
+        logger.info("Output path: {}", outputPath);
+
+        FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+
+        final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+        final String[] inputFolders = StringSplitter.split(inputPath, ",");
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        boolean isLegacyMode = false;
+        for (String inputFolder : inputFolders) {
+            Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+            if (fs.exists(baseCuboidPath) == false) {
+                // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
+                isLegacyMode = true;
+                break;
+            }
+        }
+
+        if (isLegacyMode) {
+            // merge all layer's cuboid at once, this might be hard for Spark
+            List<DataSet<Tuple2<Text, Object[]>>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
+            for (int i = 0; i < inputFolders.length; i++) {
+                String path = inputFolders[i];
+                DataSet segRdd = FlinkUtil.parseInputPath(path, fs, env, Text.class, Text.class);
+                CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+                // re-encode with new dictionaries
+                DataSet<Tuple2<Text, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
+                        sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+                mergingSegs.add(newEcoddedRdd);
+            }
+
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            HadoopOutputFormat<Text, Text> hadoopOF =
+                    new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
+
+            if (mergingSegs.size() > 0) {
+                DataSet unionedDataSet = mergingSegs.get(0);
+                for (int i = 1; i < mergingSegs.size(); i++) {
+                    unionedDataSet = unionedDataSet.union(mergingSegs.get(i));
+                }
+
+                unionedDataSet
+                        .groupBy(0)
+                        .reduce(new MeasureReduceFunction(aggregators))
+                        .setParallelism(FlinkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
+                        .map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
+                        .output(hadoopOF);
+            }
+        } else {
+            // merge by layer
+            for (int level = 0; level <= totalLevels; level++) {
+                List<DataSet<Tuple2<Text, Object[]>>> mergingSegs = Lists.newArrayList();
+                for (int i = 0; i < inputFolders.length; i++) {
+                    String path = inputFolders[i];
+                    CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+                    final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+                    DataSet<Tuple2<Text, Text>> segRdd = env.createInput(
+                            HadoopInputs.readSequenceFile(Text.class, Text.class, cuboidInputPath));
+                    // re-encode with new dictionaries
+                    DataSet<Tuple2<Text, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
+                            sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+                    mergingSegs.add(newEcoddedRdd);
+                }
+
+                final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+
+                Job jobInstanceForEachOutputFormat = Job.getInstance();
+                FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication=2 and enable compress
+                FlinkUtil.setHadoopConfForCuboid(jobInstanceForEachOutputFormat, cubeSegment, metaUrl);
+
+                FileOutputFormat.setOutputPath(jobInstanceForEachOutputFormat, new Path(cuboidOutputPath));
+                HadoopOutputFormat<Text, Text> hadoopOF =
+                        new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), jobInstanceForEachOutputFormat);
+
+                if (mergingSegs.size() > 0) {
+                    DataSet unionedDataSet = mergingSegs.get(0);
+                    for (int i = 1; i < mergingSegs.size(); i++) {
+                        unionedDataSet = unionedDataSet.union(mergingSegs.get(i));
+                    }
+
+                    unionedDataSet
+                            .groupBy(0)
+                            .reduce(new MeasureReduceFunction(aggregators))
+                            .setParallelism(FlinkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
+                            .map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
+                            .output(hadoopOF);
+                }
+            }
+        }
+
+        env.execute("Merge segments for cube:" + cubeName + ", segment " + segmentId);
+        // output the data size to console, job engine will parse and save the metric
+    }
+
+    private CubeSegment findSourceSegment(String filePath, CubeInstance cube) {
+        String jobID = JobBuilderSupport.extractJobIDFromPath(filePath);
+        return CubeInstance.findSegmentWithJobId(jobID, cube);
+    }
+
+    private static class ReEncodeCuboidFunction extends RichMapFunction<Tuple2<Text, Text>, Tuple2<Text, Object[]>> {
+        private String cubeName;
+        private String sourceSegmentId;
+        private String mergedSegmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private transient KylinConfig kylinConfig;
+        private transient SegmentReEncoder segmentReEncoder = null;
+
+        ReEncodeCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl,
+                SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.sourceSegmentId = sourceSegmentId;
+            this.mergedSegmentId = mergedSegmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cube.getDescName());
+            final CubeSegment sourceSeg = cube.getSegmentById(sourceSegmentId);
+            final CubeSegment mergedSeg = cube.getSegmentById(mergedSegmentId);
+            this.segmentReEncoder = new SegmentReEncoder(cubeDesc, sourceSeg, mergedSeg, kylinConfig);
+        }
+
+        @Override
+        public Tuple2<Text, Object[]> map(Tuple2<Text, Text> textTextTuple2) throws Exception {
+            Pair<Text, Object[]> encodedPair = segmentReEncoder.reEncode2(textTextTuple2.f0, textTextTuple2.f1);
+            return new Tuple2(encodedPair.getFirst(), encodedPair.getSecond());
+        }
+    }
+
+    private static class MeasureReduceFunction implements ReduceFunction<Object[]> {
+
+        private MeasureAggregators aggregators;
+
+        public MeasureReduceFunction(MeasureAggregators aggregators) {
+            this.aggregators = aggregators;
+        }
+
+        @Override
+        public Object[] reduce(Object[] input1, Object[] input2) throws Exception {
+            Object[] measureObjs = new Object[input1.length];
+            aggregators.aggregate(input1, input2, measureObjs);
+            return measureObjs;
+        }
+    }
+
+    private static class ConvertTextMapFunction extends RichMapFunction<Tuple2<Text, Object[]>, Tuple2<Text, Text>> {
+
+        private BufferedMeasureCodec codec;
+        private SerializableConfiguration sConf;
+        private String metaUrl;
+        private String cubeName;
+
+        public ConvertTextMapFunction(SerializableConfiguration sConf, String metaUrl, String cubeName) {
+            this.sConf = sConf;
+            this.metaUrl = metaUrl;
+            this.cubeName = cubeName;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                codec = new BufferedMeasureCodec(desc.getMeasures());
+            }
+        }
+
+        @Override
+        public Tuple2<Text, Text> map(Tuple2<Text, Object[]> tuple2) throws Exception {
+            ByteBuffer valueBuf = codec.encode(tuple2.f1);
+            byte[] encodedBytes = new byte[valueBuf.position()];
+            System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+            return new Tuple2<>(tuple2.f0, new Text(encodedBytes));
+        }
+
+    }
+
+}