You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/12 06:23:55 UTC
[kylin] branch engine-flink updated: KYLIN-3848 Flink cubing step :
build by layer
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push:
new d4f8eb2 KYLIN-3848 Flink cubing step : build by layer
d4f8eb2 is described below
commit d4f8eb28c05438daa68abfea28b4376d5b076668
Author: yanghua <ya...@gmail.com>
AuthorDate: Mon Mar 11 22:46:53 2019 +0800
KYLIN-3848 Flink cubing step : build by layer
---
.../org/apache/kylin/common/KylinConfigBase.java | 48 ++-
.../kylin/engine/flink/FlinkCubingByLayer.java | 434 +++++++++++++++++++++
.../org/apache/kylin/engine/flink/FlinkUtil.java | 156 ++++++++
3 files changed, 637 insertions(+), 1 deletion(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 81979dc..daae366 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -105,6 +105,22 @@ public abstract class KylinConfigBase implements Serializable {
return getKylinHome() + File.separator + "spark";
}
+ public static String getFlinkHome() {
+ String flinkHome = System.getenv("FLINK_HOME");
+ if (StringUtils.isNotEmpty(flinkHome)) {
+ logger.info("FLINK_HOME was set to {}", flinkHome);
+ return flinkHome;
+ }
+
+ flinkHome = System.getProperty("FLINK_HOME");
+ if (StringUtils.isNotEmpty(flinkHome)) {
+ logger.info("FLINK_HOME was set to {}", flinkHome);
+ return flinkHome;
+ }
+
+ return getKylinHome() + File.separator + "flink";
+ }
+
public static String getTempDir() {
return System.getProperty("java.io.tmpdir");
}
@@ -1228,6 +1244,8 @@ public abstract class KylinConfigBase implements Serializable {
r.put(0, "org.apache.kylin.engine.mr.MRBatchCubingEngine"); //IEngineAware.ID_MR_V1
r.put(2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2"); //IEngineAware.ID_MR_V2
r.put(4, "org.apache.kylin.engine.spark.SparkBatchCubingEngine2"); //IEngineAware.ID_SPARK
+ r.put(5, "org.apache.kylin.engine.flink.FlinkBatchCubingEngine2"); //IEngineAware.ID_FLINK
+
r.putAll(convertKeyToInteger(getPropertiesByPrefix("kylin.engine.provider.")));
return r;
}
@@ -1278,10 +1296,18 @@ public abstract class KylinConfigBase implements Serializable {
return getPropertiesByPrefix("kylin.engine.spark-conf.");
}
+ public Map<String, String> getFlinkConfigOverride() {
+ return getPropertiesByPrefix("kylin.engine.flink-conf.");
+ }
+
public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
}
+ public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) {
+ return getPropertiesByPrefix("kylin.engine.flink-conf-" + configName + ".");
+ }
+
public double getDefaultHadoopJobReducerInputMB() {
return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
}
@@ -1353,18 +1379,34 @@ public abstract class KylinConfigBase implements Serializable {
return getOptional("kylin.engine.spark.additional-jars", "");
}
+ public String getFlinkAdditionalJars() {
+ return getOptional("kylin.engine.flink.additional-jars", "");
+ }
+
public float getSparkRDDPartitionCutMB() {
return Float.parseFloat(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0"));
}
+ public float getFlinkPartitionCutMB() {
+ return Float.parseFloat(getOptional("kylin.engine.flink.partition-cut-mb", "10.0"));
+ }
+
public int getSparkMinPartition() {
return Integer.parseInt(getOptional("kylin.engine.spark.min-partition", "1"));
}
+ public int getFlinkMinPartition() {
+ return Integer.parseInt(getOptional("kylin.engine.flink.min-partition", "1"));
+ }
+
public int getSparkMaxPartition() {
return Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
}
+ public int getFlinkMaxPartition() {
+ return Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
+ }
+
public String getSparkStorageLevel() {
return getOptional("kylin.engine.spark.storage-level", "MEMORY_AND_DISK_SER");
}
@@ -1372,6 +1414,10 @@ public abstract class KylinConfigBase implements Serializable {
public boolean isSparkSanityCheckEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE));
}
+
+ public boolean isFlinkSanityCheckEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
+ }
// ============================================================================
// ENGINE.LIVY
@@ -1396,7 +1442,7 @@ public abstract class KylinConfigBase implements Serializable {
public Map<String, String> getLivyMap() {
return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
}
-
+
// ============================================================================
// QUERY
// ============================================================================
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
new file mode 100644
index 0000000..ba1f233
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Flink application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
+ */
+public class FlinkCubingByLayer extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(FlinkCubingByLayer.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+ .withDescription("Hive Intermediate Table").create("hiveTable");
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+
+ private Options options;
+
+ public FlinkCubingByLayer() {
+ options = new Options();
+ options.addOption(OPTION_INPUT_TABLE);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_OUTPUT_PATH);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+ Job job = Job.getInstance();
+
+ HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+ KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+
+ logger.info("DataSet input path : {}", inputPath);
+ logger.info("DataSet output path : {}", outputPath);
+
+ FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+ int countMeasureIndex = 0;
+ for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+ if (measureDesc.getFunction().isCount() == true) {
+ break;
+ } else {
+ countMeasureIndex++;
+ }
+ }
+
+ final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+ boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
+ boolean allNormalMeasure = true;
+ for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+ needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
+ allNormalMeasure = allNormalMeasure && needAggr[i];
+ }
+
+ logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
+
+ boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String[]> hiveDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
+
+ DataSet<Tuple2<ByteArray, Object[]>> encodedBaseDataSet = hiveDataSet.map(
+ new EncodeBaseCuboidMapFunction(cubeName, segmentId, metaUrl, sConf));
+
+ Long totalCount = 0L;
+ if (envConfig.isFlinkSanityCheckEnabled()) {
+ totalCount = encodedBaseDataSet.count();
+ }
+
+ final BaseCuboidReduceFunction baseCuboidReducerFunction = new BaseCuboidReduceFunction(cubeName, metaUrl, sConf);
+
+ BaseCuboidReduceFunction reducerFunction = baseCuboidReducerFunction;
+ if (!allNormalMeasure) {
+ reducerFunction = new CuboidReduceFunction(cubeName, metaUrl, sConf, needAggr);
+ }
+
+ final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+ DataSet<Tuple2<ByteArray, Object[]>>[] allDataSets = new DataSet[totalLevels + 1];
+ int level = 0;
+ int partition = FlinkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
+
+ // aggregate to calculate base cuboid
+ allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction).setParallelism(partition);
+
+ sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 0, Job.getInstance(), envConfig);
+
+ CuboidFlatMapFunction flatMapFunction = new CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
+
+ for (level = 1; level <= totalLevels; level++) {
+ partition = FlinkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
+
+ allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction).setParallelism(partition);
+ if (envConfig.isFlinkSanityCheckEnabled()) {
+ sanityCheck(allDataSets[level], totalCount, level, cubeStatsReader, countMeasureIndex);
+ }
+ sinkToHDFS(allDataSets[level], metaUrl, cubeName, cubeSegment, outputPath, level, Job.getInstance(), envConfig);
+ }
+
+ env.execute("Cubing for : " + cubeName + " segment " + segmentId);
+ logger.info("Finished on calculating all level cuboids.");
+ }
+
+ private void sinkToHDFS(
+ final DataSet<Tuple2<ByteArray, Object[]>> dataSet,
+ final String metaUrl,
+ final String cubeName,
+ final CubeSegment cubeSeg,
+ final String hdfsBaseLocation,
+ final int level,
+ final Job job,
+ final KylinConfig kylinConfig) throws Exception {
+ final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+ final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+ FlinkUtil.setHadoopConfForCuboid(job, cubeSeg, metaUrl);
+
+ HadoopOutputFormat<Text, Text> hadoopOF =
+ new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
+
+ SequenceFileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
+
+ dataSet.map(new RichMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<Text, Text>>() {
+
+ BufferedMeasureCodec codec;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new BufferedMeasureCodec(desc.getMeasures());
+ }
+ }
+
+ @Override
+ public Tuple2<Text, Text> map(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+ ByteBuffer valueBuf = codec.encode(tuple2.f1);
+ org.apache.hadoop.io.Text textResult = new org.apache.hadoop.io.Text();
+ textResult.set(valueBuf.array(), 0, valueBuf.position());
+ return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2.f0.array()), textResult);
+ }
+
+ }).output(hadoopOF);
+
+ logger.info("Persisting DataSet for level " + level + " into " + cuboidOutputPath);
+ }
+
+ private void sanityCheck(DataSet<Tuple2<ByteArray, Object[]>> dataSet, Long totalCount, int thisLevel,
+ CubeStatsReader cubeStatsReader, final int countMeasureIndex) throws Exception {
+ int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size();
+ Long count2 = getDataSetCountSum(dataSet, countMeasureIndex);
+ if (count2 != totalCount * thisCuboidNum) {
+ throw new IllegalStateException(
+ String.format(Locale.ROOT, "Sanity check failed, level %s, total count(*) is %s; cuboid number %s",
+ thisLevel, count2, thisCuboidNum));
+ } else {
+ logger.info("sanity check success for level " + thisLevel + ", count(*) is " + (count2 / thisCuboidNum));
+ }
+ }
+
+ private Long getDataSetCountSum(DataSet<Tuple2<ByteArray, Object[]>> dataSet, final int countMeasureIndex) throws Exception {
+ Long count = dataSet.map((Tuple2<ByteArray, Object[]> byteArrayTuple2) ->
+ new Tuple2<>(byteArrayTuple2.f0, (Long) byteArrayTuple2.f1[countMeasureIndex])
+ ).sum(1).count();
+
+ return count;
+ }
+
+ /**
+ * A map function used to encode the base cuboid.
+ */
+ private static class EncodeBaseCuboidMapFunction extends RichMapFunction<String[], Tuple2<ByteArray, Object[]>> {
+
+ private BaseCuboidBuilder baseCuboidBuilder = null;
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+
+ public EncodeBaseCuboidMapFunction(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ CubeJoinedFlatTableEnrich interDesc = new CubeJoinedFlatTableEnrich(
+ EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc, baseCuboidId);
+ baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc,
+ AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+ MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
+ }
+ }
+
+ @Override
+ public Tuple2<ByteArray, Object[]> map(String[] rowArray) throws Exception {
+ baseCuboidBuilder.resetAggrs();
+ byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+ Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
+ return new Tuple2<>(new ByteArray(rowKey), result);
+ }
+ }
+
+ /**
+ * A reduce function used to aggregate base cuboid.
+ */
+ private static class BaseCuboidReduceFunction extends RichReduceFunction<Tuple2<ByteArray, Object[]>> {
+
+ protected String cubeName;
+ protected String metaUrl;
+ protected CubeDesc cubeDesc;
+ protected int measureNum;
+ protected MeasureAggregators aggregators;
+ protected SerializableConfiguration conf;
+
+ public BaseCuboidReduceFunction(String cubeName, String metaUrl, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ cubeDesc = cubeInstance.getDescriptor();
+ aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+ measureNum = cubeDesc.getMeasures().size();
+ }
+ }
+
+ @Override
+ public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> input1,
+ Tuple2<ByteArray, Object[]> input2) throws Exception {
+ Object[] result = new Object[measureNum];
+ aggregators.aggregate(input1.f1, input2.f1, result);
+ return new Tuple2<>(input1.f0, result);
+ }
+
+ }
+
+ /**
+ * A reduce function does aggregation based on boolean flag array.
+ */
+ private static class CuboidReduceFunction extends BaseCuboidReduceFunction {
+
+ private boolean[] needAgg;
+
+ public CuboidReduceFunction(String cubeName, String metaUrl, SerializableConfiguration conf, boolean[] needAgg) {
+ super(cubeName, metaUrl, conf);
+ this.needAgg = needAgg;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> input1,
+ Tuple2<ByteArray, Object[]> input2) throws Exception {
+ Object[] result = new Object[measureNum];
+ aggregators.aggregate(input1.f1, input2.f1, result, needAgg);
+ return new Tuple2<>(input1.f0, result);
+ }
+ }
+
+ /**
+ * A flatmap function which extracts a cuboid's children cuboids and emit them to the down stream.
+ */
+ private static class CuboidFlatMapFunction extends RichFlatMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
+
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private CubeSegment cubeSegment;
+ private CubeDesc cubeDesc;
+ private NDCuboidBuilder ndCuboidBuilder;
+ private RowKeySplitter rowKeySplitter;
+ private SerializableConfiguration conf;
+
+ public CuboidFlatMapFunction(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+ this.cubeDesc = cubeInstance.getDescriptor();
+ this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
+ this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+ }
+ }
+
+ @Override
+ public void flatMap(Tuple2<ByteArray, Object[]> tuple2, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+ byte[] key = tuple2.f0.array();
+ long cuboidId = rowKeySplitter.parseCuboid(key);
+ final List<Long> myChildren = cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId);
+
+ // if still empty or null
+ if (myChildren == null || myChildren.size() == 0) {
+ return;
+ }
+ rowKeySplitter.split(key);
+ final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
+
+ for (Long child : myChildren) {
+ Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
+ ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, childCuboid,
+ rowKeySplitter.getSplitBuffers());
+
+ collector.collect(new Tuple2<>(result, tuple2.f1));
+ }
+ }
+ }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
new file mode 100644
index 0000000..4473a44
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.HadoopInputs;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.StorageFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A helper class which contains some util methods used by Flink cube engine.
+ */
+public class FlinkUtil {
+
+ public static IFlinkInput.IFlinkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ return SourceManager.createEngineAdapter(seg, IFlinkInput.class).getBatchCubingInputSide(flatDesc);
+ }
+
+ public static IFlinkOutput.IFlinkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg, IFlinkOutput.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IFlinkOutput.IFlinkBatchMergeOutputSide getBatchMergeOutputSide2(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg, IFlinkOutput.class).getBatchMergeOutputSide(seg);
+ }
+
+ public static IFlinkInput.IFlinkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return SourceManager.createEngineAdapter(seg, IFlinkInput.class).getBatchMergeInputSide(seg);
+ }
+
+ public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
+ }
+
+ public static DataSet parseInputPath(String inputPath, FileSystem fs, ExecutionEnvironment env, Class keyClass,
+ Class valueClass) throws IOException {
+ List<String> inputFolders = Lists.newArrayList();
+ Path inputHDFSPath = new Path(inputPath);
+ FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
+ boolean hasDir = false;
+ for (FileStatus stat : fileStatuses) {
+ if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+ hasDir = true;
+ inputFolders.add(stat.getPath().toString());
+ }
+ }
+
+ if (!hasDir) {
+ return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, inputHDFSPath.toString()));
+ }
+
+ return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, StringUtil.join(inputFolders, ",")));
+ }
+
+
+ public static int estimateLayerPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
+ double baseCuboidSize = statsReader.estimateLayerSize(level);
+ float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
+ int partition = (int) (baseCuboidSize / partitionCutMB);
+ partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
+ partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
+ return partition;
+ }
+
+ public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
+ double totalSize = 0;
+ for (double x: statsReader.getCuboidSizeMap().values()) {
+ totalSize += x;
+ }
+ float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
+ int partition = (int) (totalSize / partitionCutMB);
+ partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
+ partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
+ return partition;
+ }
+
+ public static void setHadoopConfForCuboid(Job job, CubeSegment segment, String metaUrl) throws Exception {
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ }
+
+ public static void modifyFlinkHadoopConfiguration(Job job) throws Exception {
+ job.getConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+ job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
+ job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
+ job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
+ }
+
+ public static DataSet<String[]> readHiveRecords(boolean isSequenceFile, ExecutionEnvironment env, String inputPath, String hiveTable, Job job) throws IOException {
+ DataSet<String[]> recordDataSet;
+
+ if (isSequenceFile) {
+ recordDataSet = env
+ .createInput(HadoopInputs.readHadoopFile(new SequenceFileInputFormat(), BytesWritable.class, Text.class, inputPath, job),
+ TypeInformation.of(new TypeHint<Tuple2<BytesWritable, Text>>() {}))
+ .map(new MapFunction<Tuple2<BytesWritable, Text>, String[]>() {
+ @Override
+ public String[] map(Tuple2<BytesWritable, Text> tuple2) throws Exception {
+
+ System.out.println("read records from hive.");
+
+ String s = Bytes.toString(tuple2.f1.getBytes(), 0, tuple2.f1.getLength());
+ return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+ }
+ });
+ } else {
+ throw new UnsupportedOperationException("Currently, Flink does not support read hive table directly.");
+ }
+
+ return recordDataSet;
+ }
+
+}