You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/04 01:59:59 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4746 Improve build performance by reducing the count of calling 'count()' function

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 0da41e2  KYLIN-4746 Improve build performance by reducing the count of calling 'count()' function
0da41e2 is described below

commit 0da41e20a652794d9328c819977754ec8c4f9941
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Thu Sep 3 23:19:04 2020 +0800

    KYLIN-4746 Improve build performance by reducing the count of calling 'count()' function
---
 .../kylin/engine/spark/job/CubeBuildJob.java       | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 3a44d84..bbf50e8 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -77,7 +77,8 @@ public class CubeBuildJob extends SparkApplication {
     private CubeManager cubeManager;
     private CubeInstance cubeInstance;
     private BuildLayoutWithUpdate buildLayoutWithUpdate;
-    private Map<Long, Short> cuboidShardNum = Maps.newHashMap();
+    private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
+    private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
     public static void main(String[] args) {
         CubeBuildJob cubeBuildJob = new CubeBuildJob();
         cubeBuildJob.execute(args);
@@ -217,7 +218,10 @@ public class CubeBuildJob extends SparkApplication {
             cuboidsNumInLayer += toBuildCuboids.size();
             Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty.");
             Dataset<Row> parentDS = info.getParentDS();
-            long parentDSCnt = parentDS.count();
+            // record the source count of flat table
+            if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) {
+                cuboidsRowCount.putIfAbsent(info.getLayoutId(), parentDS.count());
+            }
 
             for (LayoutEntity index : toBuildCuboids) {
                 Preconditions.checkNotNull(parentDS, "Parent dataset is null when building.");
@@ -229,8 +233,7 @@ public class CubeBuildJob extends SparkApplication {
 
                     @Override
                     public LayoutEntity build() throws IOException {
-                        return buildCuboid(seg, index, parentDS, st, info.getLayoutId(),
-                                parentDSCnt);
+                        return buildCuboid(seg, index, parentDS, st, info.getLayoutId());
                     }
                 }, config);
                 allIndexesInCurrentLayer.add(index);
@@ -292,7 +295,7 @@ public class CubeBuildJob extends SparkApplication {
     }
 
     private LayoutEntity buildCuboid(SegmentInfo seg, LayoutEntity cuboid, Dataset<Row> parent,
-                                    SpanningTree spanningTree, long parentId, long parentDSCnt) throws IOException {
+                                    SpanningTree spanningTree, long parentId) throws IOException {
         String parentName = String.valueOf(parentId);
         if (parentId == ParentSourceChooser.FLAT_TABLE_FLAG()) {
             parentName = "flat table";
@@ -308,7 +311,7 @@ public class CubeBuildJob extends SparkApplication {
             Set<Integer> orderedDims = layoutEntity.getOrderedDimensions().keySet();
             Dataset<Row> afterSort = afterPrj.select(NSparkCubingUtil.getColumns(orderedDims))
                     .sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims));
-            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt);
+            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
         } else {
             Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, dimIndexes, cuboid.getOrderedMeasures(),
                     spanningTree, false);
@@ -320,7 +323,7 @@ public class CubeBuildJob extends SparkApplication {
                     .select(NSparkCubingUtil.getColumns(rowKeys, layoutEntity.getOrderedMeasures().keySet()))
                     .sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys));
 
-            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt);
+            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
         }
         ss.sparkContext().setJobDescription(null);
         logger.info("Finished Build index :{}, in segment:{}", cuboid.getId(), seg.id());
@@ -328,7 +331,7 @@ public class CubeBuildJob extends SparkApplication {
     }
 
     private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo seg, LayoutEntity layout,
-                                     long parentDSCnt) throws IOException {
+                                     long parentId) throws IOException {
         long layoutId = layout.getId();
 
         // for spark metrics
@@ -349,8 +352,11 @@ public class CubeBuildJob extends SparkApplication {
         if (rowCount == -1) {
             infos.recordAbnormalLayouts(layoutId, "'Job metrics seems null, use count() to collect cuboid rows.'");
             logger.debug("Can not get cuboid row cnt, use count() to collect cuboid rows.");
-            layout.setRows(dataset.count());
-            layout.setSourceRows(parentDSCnt);
+            long cuboidRowCnt = dataset.count();
+            layout.setRows(cuboidRowCnt);
+            // record the row count of cuboid
+            cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
+            layout.setSourceRows(cuboidsRowCount.get(parentId));
         } else {
             layout.setRows(rowCount);
             layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));