You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:37 UTC

[kylin] 24/30: KYLIN-4087 Use reduceGroup operator to optimize build cube step

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 599c46a351128000e37846f3b9341ff121c6b3e2
Author: yanghua <ya...@gmail.com>
AuthorDate: Tue Jul 16 15:33:58 2019 +0800

    KYLIN-4087 Use reduceGroup operator to optimize build cube step
---
 .../kylin/engine/flink/FlinkCubingByLayer.java     | 91 ++++++++++++++++++++--
 1 file changed, 86 insertions(+), 5 deletions(-)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 1551788..9e6f86f 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.DataSet;
@@ -180,11 +181,11 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
             totalCount = encodedBaseDataSet.count();
         }
 
-        final BaseCuboidReduceFunction baseCuboidReducerFunction = new BaseCuboidReduceFunction(cubeName, metaUrl, sConf);
+        final BaseCuboidReduceGroupFunction baseCuboidReducerFunction = new BaseCuboidReduceGroupFunction(cubeName, metaUrl, sConf);
 
-        BaseCuboidReduceFunction reducerFunction = baseCuboidReducerFunction;
+        BaseCuboidReduceGroupFunction reducerFunction = baseCuboidReducerFunction;
         if (!allNormalMeasure) {
-            reducerFunction = new CuboidReduceFunction(cubeName, metaUrl, sConf, needAggr);
+            reducerFunction = new CuboidReduceGroupFunction(cubeName, metaUrl, sConf, needAggr);
         }
 
         final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
@@ -192,14 +193,14 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
         int level = 0;
 
         // aggregate to calculate base cuboid
-        allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction);
+        allDataSets[0] = encodedBaseDataSet.groupBy(0).reduceGroup(baseCuboidReducerFunction);
 
         sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 0, Job.getInstance(), envConfig);
 
         CuboidFlatMapFunction flatMapFunction = new CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
 
         for (level = 1; level <= totalLevels; level++) {
-            allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction);
+            allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduceGroup(reducerFunction);
             if (envConfig.isFlinkSanityCheckEnabled()) {
                 sanityCheck(allDataSets[level], totalCount, level, cubeStatsReader, countMeasureIndex);
             }
@@ -321,6 +322,53 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
         }
     }
 
+    private static class BaseCuboidReduceGroupFunction extends RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
+
+        protected String cubeName;
+        protected String metaUrl;
+        protected CubeDesc cubeDesc;
+        protected int measureNum;
+        protected MeasureAggregators aggregators;
+        protected SerializableConfiguration conf;
+
+        public BaseCuboidReduceGroupFunction(String cubeName, String metaUrl, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+                cubeDesc = cubeInstance.getDescriptor();
+                aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+                measureNum = cubeDesc.getMeasures().size();
+            }
+        }
+
+        @Override
+        public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+            Object[] result = null;
+            ByteArray key = null;
+
+            for (Tuple2<ByteArray, Object[]> item : iterable) {
+                key = item.f0;
+                if (result == null) {
+                    result = item.f1;
+                } else {
+                    Object[] temp = new Object[measureNum];
+                    aggregators.aggregate(item.f1, result, temp);
+                    result = temp;
+                }
+            }
+
+            collector.collect(new Tuple2<>(key, result));
+        }
+    }
+
     /**
      * A reduce function used to aggregate base cuboid.
      */
@@ -361,6 +409,39 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
 
     }
 
+    private static class CuboidReduceGroupFunction extends BaseCuboidReduceGroupFunction {
+        private boolean[] needAgg;
+
+        public CuboidReduceGroupFunction(String cubeName, String metaUrl, SerializableConfiguration conf, boolean[] needAgg) {
+            super(cubeName, metaUrl, conf);
+            this.needAgg = needAgg;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+        }
+
+        @Override
+        public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+            Object[] result = null;
+            ByteArray key = null;
+
+            for (Tuple2<ByteArray, Object[]> item : iterable) {
+                key = item.f0;
+                if (result == null) {
+                    result = item.f1;
+                } else {
+                    Object[] temp = new Object[measureNum];
+                    aggregators.aggregate(item.f1, result, temp, needAgg);
+                    result = temp;
+                }
+            }
+
+            collector.collect(new Tuple2<>(key, result));
+        }
+    }
+
     /**
      * A reduce function does aggregation based on boolean flag array.
      */