You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/12 06:23:55 UTC

[kylin] branch engine-flink updated: KYLIN-3848 Flink cubing step : build by layer

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/engine-flink by this push:
     new d4f8eb2  KYLIN-3848 Flink cubing step : build by layer
d4f8eb2 is described below

commit d4f8eb28c05438daa68abfea28b4376d5b076668
Author: yanghua <ya...@gmail.com>
AuthorDate: Mon Mar 11 22:46:53 2019 +0800

    KYLIN-3848 Flink cubing step : build by layer
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  48 ++-
 .../kylin/engine/flink/FlinkCubingByLayer.java     | 434 +++++++++++++++++++++
 .../org/apache/kylin/engine/flink/FlinkUtil.java   | 156 ++++++++
 3 files changed, 637 insertions(+), 1 deletion(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 81979dc..daae366 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -105,6 +105,22 @@ public abstract class KylinConfigBase implements Serializable {
         return getKylinHome() + File.separator + "spark";
     }
 
+    public static String getFlinkHome() {
+        String flinkHome = System.getenv("FLINK_HOME");
+        if (StringUtils.isNotEmpty(flinkHome)) {
+            logger.info("FLINK_HOME was set to {}", flinkHome);
+            return flinkHome;
+        }
+
+        flinkHome = System.getProperty("FLINK_HOME");
+        if (StringUtils.isNotEmpty(flinkHome)) {
+            logger.info("FLINK_HOME was set to {}", flinkHome);
+            return flinkHome;
+        }
+
+        return getKylinHome() + File.separator + "flink";
+    }
+
     public static String getTempDir() {
         return System.getProperty("java.io.tmpdir");
     }
@@ -1228,6 +1244,8 @@ public abstract class KylinConfigBase implements Serializable {
         r.put(0, "org.apache.kylin.engine.mr.MRBatchCubingEngine"); //IEngineAware.ID_MR_V1
         r.put(2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2"); //IEngineAware.ID_MR_V2
         r.put(4, "org.apache.kylin.engine.spark.SparkBatchCubingEngine2"); //IEngineAware.ID_SPARK
+        r.put(5, "org.apache.kylin.engine.flink.FlinkBatchCubingEngine2"); //IEngineAware.ID_FLINK
+
         r.putAll(convertKeyToInteger(getPropertiesByPrefix("kylin.engine.provider.")));
         return r;
     }
@@ -1278,10 +1296,18 @@ public abstract class KylinConfigBase implements Serializable {
         return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
 
+    public Map<String, String> getFlinkConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.flink-conf.");
+    }
+
     public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
         return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
     }
 
+    public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) {
+        return getPropertiesByPrefix("kylin.engine.flink-conf-" + configName + ".");
+    }
+
     public double getDefaultHadoopJobReducerInputMB() {
         return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
     }
@@ -1353,18 +1379,34 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.engine.spark.additional-jars", "");
     }
 
+    public String getFlinkAdditionalJars() {
+        return getOptional("kylin.engine.flink.additional-jars", "");
+    }
+
     public float getSparkRDDPartitionCutMB() {
         return Float.parseFloat(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0"));
     }
 
+    public float getFlinkPartitionCutMB() {
+        return Float.parseFloat(getOptional("kylin.engine.flink.partition-cut-mb", "10.0"));
+    }
+
     public int getSparkMinPartition() {
         return Integer.parseInt(getOptional("kylin.engine.spark.min-partition", "1"));
     }
 
+    public int getFlinkMinPartition() {
+        return Integer.parseInt(getOptional("kylin.engine.flink.min-partition", "1"));
+    }
+
     public int getSparkMaxPartition() {
         return Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
     }
 
+    public int getFlinkMaxPartition() {
+        return Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
+    }
+
     public String getSparkStorageLevel() {
         return getOptional("kylin.engine.spark.storage-level", "MEMORY_AND_DISK_SER");
     }
@@ -1372,6 +1414,10 @@ public abstract class KylinConfigBase implements Serializable {
     public boolean isSparkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE));
     }
+
+    public boolean isFlinkSanityCheckEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
+    }
     
     // ============================================================================
     // ENGINE.LIVY
@@ -1396,7 +1442,7 @@ public abstract class KylinConfigBase implements Serializable {
     public Map<String, String> getLivyMap() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
     }
-    
+
     // ============================================================================
     // QUERY
     // ============================================================================
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
new file mode 100644
index 0000000..ba1f233
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Flink application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
+ */
+public class FlinkCubingByLayer extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(FlinkCubingByLayer.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+
+    private Options options;
+
+    public FlinkCubingByLayer() {
+        options = new Options();
+        options.addOption(OPTION_INPUT_TABLE);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+        Job job = Job.getInstance();
+
+        HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
+
+        final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+
+        logger.info("DataSet input path : {}", inputPath);
+        logger.info("DataSet output path : {}", outputPath);
+
+        FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+        int countMeasureIndex = 0;
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isCount() == true) {
+                break;
+            } else {
+                countMeasureIndex++;
+            }
+        }
+
+        final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+        boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
+        boolean allNormalMeasure = true;
+        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+            needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
+            allNormalMeasure = allNormalMeasure && needAggr[i];
+        }
+
+        logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
+
+        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        DataSet<String[]> hiveDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
+
+        DataSet<Tuple2<ByteArray, Object[]>> encodedBaseDataSet = hiveDataSet.map(
+                new EncodeBaseCuboidMapFunction(cubeName, segmentId, metaUrl, sConf));
+
+        Long totalCount = 0L;
+        if (envConfig.isFlinkSanityCheckEnabled()) {
+            totalCount = encodedBaseDataSet.count();
+        }
+
+        final BaseCuboidReduceFunction baseCuboidReducerFunction = new BaseCuboidReduceFunction(cubeName, metaUrl, sConf);
+
+        BaseCuboidReduceFunction reducerFunction = baseCuboidReducerFunction;
+        if (!allNormalMeasure) {
+            reducerFunction = new CuboidReduceFunction(cubeName, metaUrl, sConf, needAggr);
+        }
+
+        final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+        DataSet<Tuple2<ByteArray, Object[]>>[] allDataSets = new DataSet[totalLevels + 1];
+        int level = 0;
+        int partition = FlinkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
+
+        // aggregate to calculate base cuboid
+        allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction).setParallelism(partition);
+
+        sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 0, Job.getInstance(), envConfig);
+
+        CuboidFlatMapFunction flatMapFunction = new CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
+
+        for (level = 1; level <= totalLevels; level++) {
+            partition = FlinkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
+
+            allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction).setParallelism(partition);
+            if (envConfig.isFlinkSanityCheckEnabled()) {
+                sanityCheck(allDataSets[level], totalCount, level, cubeStatsReader, countMeasureIndex);
+            }
+            sinkToHDFS(allDataSets[level], metaUrl, cubeName, cubeSegment, outputPath, level, Job.getInstance(), envConfig);
+        }
+
+        env.execute("Cubing for : " + cubeName + " segment " + segmentId);
+        logger.info("Finished on calculating all level cuboids.");
+    }
+
+    private void sinkToHDFS(
+            final DataSet<Tuple2<ByteArray, Object[]>> dataSet,
+            final String metaUrl,
+            final String cubeName,
+            final CubeSegment cubeSeg,
+            final String hdfsBaseLocation,
+            final int level,
+            final Job job,
+            final KylinConfig kylinConfig) throws Exception {
+        final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+        final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+        FlinkUtil.setHadoopConfForCuboid(job, cubeSeg, metaUrl);
+
+        HadoopOutputFormat<Text, Text> hadoopOF =
+                new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
+
+        SequenceFileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
+
+        dataSet.map(new RichMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<Text, Text>>() {
+
+            BufferedMeasureCodec codec;
+
+            @Override
+            public void open(Configuration parameters) throws Exception {
+                KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                        .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                    CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                    codec = new BufferedMeasureCodec(desc.getMeasures());
+                }
+            }
+
+            @Override
+            public Tuple2<Text, Text> map(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+                ByteBuffer valueBuf = codec.encode(tuple2.f1);
+                org.apache.hadoop.io.Text textResult =  new org.apache.hadoop.io.Text();
+                textResult.set(valueBuf.array(), 0, valueBuf.position());
+                return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2.f0.array()), textResult);
+            }
+
+        }).output(hadoopOF);
+
+        logger.info("Persisting DataSet for level " + level + " into " + cuboidOutputPath);
+    }
+
+    private void sanityCheck(DataSet<Tuple2<ByteArray, Object[]>> dataSet, Long totalCount, int thisLevel,
+            CubeStatsReader cubeStatsReader, final int countMeasureIndex) throws Exception {
+        int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size();
+        Long count2 = getDataSetCountSum(dataSet, countMeasureIndex);
+        if (count2 != totalCount * thisCuboidNum) {
+            throw new IllegalStateException(
+                    String.format(Locale.ROOT, "Sanity check failed, level %s, total count(*) is %s; cuboid number %s",
+                            thisLevel, count2, thisCuboidNum));
+        } else {
+            logger.info("sanity check success for level " + thisLevel + ", count(*) is " + (count2 / thisCuboidNum));
+        }
+    }
+
+    private Long getDataSetCountSum(DataSet<Tuple2<ByteArray, Object[]>> dataSet, final int countMeasureIndex) throws Exception {
+        Long count = dataSet.map((Tuple2<ByteArray, Object[]> byteArrayTuple2) ->
+                new Tuple2<>(byteArrayTuple2.f0, (Long) byteArrayTuple2.f1[countMeasureIndex])
+        ).sum(1).count();
+
+        return count;
+    }
+
+    /**
+     * A map function used to encode the base cuboid.
+     */
+    private static class EncodeBaseCuboidMapFunction extends RichMapFunction<String[], Tuple2<ByteArray, Object[]>> {
+
+        private BaseCuboidBuilder baseCuboidBuilder = null;
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+
+        public EncodeBaseCuboidMapFunction(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+                CubeDesc cubeDesc = cubeInstance.getDescriptor();
+                CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+                CubeJoinedFlatTableEnrich interDesc = new CubeJoinedFlatTableEnrich(
+                        EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+                long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+                Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc, baseCuboidId);
+                baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc,
+                        AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+                        MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
+            }
+        }
+
+        @Override
+        public Tuple2<ByteArray, Object[]> map(String[] rowArray) throws Exception {
+            baseCuboidBuilder.resetAggrs();
+            byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+            Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
+            return new Tuple2<>(new ByteArray(rowKey), result);
+        }
+    }
+
+    /**
+     * A reduce function used to aggregate base cuboid.
+     */
+    private static class BaseCuboidReduceFunction extends RichReduceFunction<Tuple2<ByteArray, Object[]>> {
+
+        protected String cubeName;
+        protected String metaUrl;
+        protected CubeDesc cubeDesc;
+        protected int measureNum;
+        protected MeasureAggregators aggregators;
+        protected SerializableConfiguration conf;
+
+        public BaseCuboidReduceFunction(String cubeName, String metaUrl, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+                cubeDesc = cubeInstance.getDescriptor();
+                aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+                measureNum = cubeDesc.getMeasures().size();
+            }
+        }
+
+        @Override
+        public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> input1,
+                Tuple2<ByteArray, Object[]> input2) throws Exception {
+            Object[] result = new Object[measureNum];
+            aggregators.aggregate(input1.f1, input2.f1, result);
+            return new Tuple2<>(input1.f0, result);
+        }
+
+    }
+
+    /**
+     * A reduce function does aggregation based on boolean flag array.
+     */
+    private static class CuboidReduceFunction extends BaseCuboidReduceFunction {
+
+        private boolean[] needAgg;
+
+        public CuboidReduceFunction(String cubeName, String metaUrl, SerializableConfiguration conf, boolean[] needAgg) {
+            super(cubeName, metaUrl, conf);
+            this.needAgg = needAgg;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+        }
+
+        @Override
+        public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> input1,
+                Tuple2<ByteArray, Object[]> input2) throws Exception {
+            Object[] result = new Object[measureNum];
+            aggregators.aggregate(input1.f1, input2.f1, result, needAgg);
+            return new Tuple2<>(input1.f0, result);
+        }
+    }
+
+    /**
+     * A flatmap function which extracts a cuboid's children cuboids and emit them to the down stream.
+     */
+    private static class CuboidFlatMapFunction extends RichFlatMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
+
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private CubeSegment cubeSegment;
+        private CubeDesc cubeDesc;
+        private NDCuboidBuilder ndCuboidBuilder;
+        private RowKeySplitter rowKeySplitter;
+        private SerializableConfiguration conf;
+
+        public CuboidFlatMapFunction(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+                this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+                this.cubeDesc = cubeInstance.getDescriptor();
+                this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
+                this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+            }
+        }
+
+        @Override
+        public void flatMap(Tuple2<ByteArray, Object[]> tuple2, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+            byte[] key = tuple2.f0.array();
+            long cuboidId = rowKeySplitter.parseCuboid(key);
+            final List<Long> myChildren = cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId);
+
+            // if still empty or null
+            if (myChildren == null || myChildren.size() == 0) {
+                return;
+            }
+            rowKeySplitter.split(key);
+            final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
+
+            for (Long child : myChildren) {
+                Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
+                ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, childCuboid,
+                        rowKeySplitter.getSplitBuffers());
+
+                collector.collect(new Tuple2<>(result, tuple2.f1));
+            }
+        }
+    }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
new file mode 100644
index 0000000..4473a44
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.HadoopInputs;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.StorageFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A helper class which contains some util methods used by Flink cube engine.
+ */
+public class FlinkUtil {
+
+    public static IFlinkInput.IFlinkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        return SourceManager.createEngineAdapter(seg, IFlinkInput.class).getBatchCubingInputSide(flatDesc);
+    }
+
+    public static IFlinkOutput.IFlinkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+        return StorageFactory.createEngineAdapter(seg, IFlinkOutput.class).getBatchCubingOutputSide(seg);
+    }
+
+    public static IFlinkOutput.IFlinkBatchMergeOutputSide getBatchMergeOutputSide2(CubeSegment seg) {
+        return StorageFactory.createEngineAdapter(seg, IFlinkOutput.class).getBatchMergeOutputSide(seg);
+    }
+
+    public static IFlinkInput.IFlinkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return SourceManager.createEngineAdapter(seg, IFlinkInput.class).getBatchMergeInputSide(seg);
+    }
+
+    public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
+        return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
+    }
+
+    public static DataSet parseInputPath(String inputPath, FileSystem fs, ExecutionEnvironment env, Class keyClass,
+            Class valueClass) throws IOException {
+        List<String> inputFolders = Lists.newArrayList();
+        Path inputHDFSPath = new Path(inputPath);
+        FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
+        boolean hasDir = false;
+        for (FileStatus stat : fileStatuses) {
+            if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+                hasDir = true;
+                inputFolders.add(stat.getPath().toString());
+            }
+        }
+
+        if (!hasDir) {
+            return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, inputHDFSPath.toString()));
+        }
+
+        return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, StringUtil.join(inputFolders, ",")));
+    }
+
+
+    public static int estimateLayerPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
+        double baseCuboidSize = statsReader.estimateLayerSize(level);
+        float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
+        int partition = (int) (baseCuboidSize / partitionCutMB);
+        partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
+        partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
+        return partition;
+    }
+
+    public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
+        double totalSize = 0;
+        for (double x: statsReader.getCuboidSizeMap().values()) {
+            totalSize += x;
+        }
+        float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
+        int partition = (int) (totalSize / partitionCutMB);
+        partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
+        partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
+        return partition;
+    }
+
+    public static void setHadoopConfForCuboid(Job job, CubeSegment segment, String metaUrl) throws Exception {
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    }
+
+    public static void modifyFlinkHadoopConfiguration(Job job) throws Exception {
+        job.getConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+        job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
+        job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
+        job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
+    }
+
+    public static DataSet<String[]> readHiveRecords(boolean isSequenceFile, ExecutionEnvironment env, String inputPath, String hiveTable, Job job) throws IOException {
+        DataSet<String[]> recordDataSet;
+
+        if (isSequenceFile) {
+            recordDataSet = env
+                    .createInput(HadoopInputs.readHadoopFile(new SequenceFileInputFormat(), BytesWritable.class, Text.class, inputPath, job),
+                            TypeInformation.of(new TypeHint<Tuple2<BytesWritable, Text>>() {}))
+                    .map(new MapFunction<Tuple2<BytesWritable, Text>, String[]>() {
+                        @Override
+                        public String[] map(Tuple2<BytesWritable, Text> tuple2) throws Exception {
+
+                            System.out.println("read records from hive.");
+
+                            String s = Bytes.toString(tuple2.f1.getBytes(), 0, tuple2.f1.getLength());
+                            return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+                        }
+                    });
+        } else {
+            throw new UnsupportedOperationException("Currently, Flink does not support read hive table directly.");
+        }
+
+        return recordDataSet;
+    }
+
+}