You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:37 UTC
[kylin] 24/30: KYLIN-4087 Use reduceGroup operator to optimize
build cube step
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 599c46a351128000e37846f3b9341ff121c6b3e2
Author: yanghua <ya...@gmail.com>
AuthorDate: Tue Jul 16 15:33:58 2019 +0800
KYLIN-4087 Use reduceGroup operator to optimize build cube step
---
.../kylin/engine/flink/FlinkCubingByLayer.java | 91 ++++++++++++++++++++--
1 file changed, 86 insertions(+), 5 deletions(-)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 1551788..9e6f86f 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.DataSet;
@@ -180,11 +181,11 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
totalCount = encodedBaseDataSet.count();
}
- final BaseCuboidReduceFunction baseCuboidReducerFunction = new BaseCuboidReduceFunction(cubeName, metaUrl, sConf);
+ final BaseCuboidReduceGroupFunction baseCuboidReducerFunction = new BaseCuboidReduceGroupFunction(cubeName, metaUrl, sConf);
- BaseCuboidReduceFunction reducerFunction = baseCuboidReducerFunction;
+ BaseCuboidReduceGroupFunction reducerFunction = baseCuboidReducerFunction;
if (!allNormalMeasure) {
- reducerFunction = new CuboidReduceFunction(cubeName, metaUrl, sConf, needAggr);
+ reducerFunction = new CuboidReduceGroupFunction(cubeName, metaUrl, sConf, needAggr);
}
final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
@@ -192,14 +193,14 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
int level = 0;
// aggregate to calculate base cuboid
- allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction);
+ allDataSets[0] = encodedBaseDataSet.groupBy(0).reduceGroup(baseCuboidReducerFunction);
sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 0, Job.getInstance(), envConfig);
CuboidFlatMapFunction flatMapFunction = new CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
for (level = 1; level <= totalLevels; level++) {
- allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction);
+ allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduceGroup(reducerFunction);
if (envConfig.isFlinkSanityCheckEnabled()) {
sanityCheck(allDataSets[level], totalCount, level, cubeStatsReader, countMeasureIndex);
}
@@ -321,6 +322,53 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
}
}
+ private static class BaseCuboidReduceGroupFunction extends RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
+
+ protected String cubeName;
+ protected String metaUrl;
+ protected CubeDesc cubeDesc;
+ protected int measureNum;
+ protected MeasureAggregators aggregators;
+ protected SerializableConfiguration conf;
+
+ public BaseCuboidReduceGroupFunction(String cubeName, String metaUrl, SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kConfig)) {
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ cubeDesc = cubeInstance.getDescriptor();
+ aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+ measureNum = cubeDesc.getMeasures().size();
+ }
+ }
+
+ @Override
+ public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+ Object[] result = null;
+ ByteArray key = null;
+
+ for (Tuple2<ByteArray, Object[]> item : iterable) {
+ key = item.f0;
+ if (result == null) {
+ result = item.f1;
+ } else {
+ Object[] temp = new Object[measureNum];
+ aggregators.aggregate(item.f1, result, temp);
+ result = temp;
+ }
+ }
+
+ collector.collect(new Tuple2<>(key, result));
+ }
+ }
+
/**
* A reduce function used to aggregate base cuboid.
*/
@@ -361,6 +409,39 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
}
+ private static class CuboidReduceGroupFunction extends BaseCuboidReduceGroupFunction {
+ private boolean[] needAgg;
+
+ public CuboidReduceGroupFunction(String cubeName, String metaUrl, SerializableConfiguration conf, boolean[] needAgg) {
+ super(cubeName, metaUrl, conf);
+ this.needAgg = needAgg;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+ Object[] result = null;
+ ByteArray key = null;
+
+ for (Tuple2<ByteArray, Object[]> item : iterable) {
+ key = item.f0;
+ if (result == null) {
+ result = item.f1;
+ } else {
+ Object[] temp = new Object[measureNum];
+ aggregators.aggregate(item.f1, result, temp, needAgg);
+ result = temp;
+ }
+ }
+
+ collector.collect(new Tuple2<>(key, result));
+ }
+ }
+
/**
* A reduce function does aggregation based on boolean flag array.
*/