You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/04 01:59:59 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4746 Improve
build performance by reducing the count of calling 'count()' function
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 0da41e2 KYLIN-4746 Improve build performance by reducing the count of calling 'count()' function
0da41e2 is described below
commit 0da41e20a652794d9328c819977754ec8c4f9941
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Thu Sep 3 23:19:04 2020 +0800
KYLIN-4746 Improve build performance by reducing the count of calling 'count()' function
---
.../kylin/engine/spark/job/CubeBuildJob.java | 26 +++++++++++++---------
1 file changed, 16 insertions(+), 10 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 3a44d84..bbf50e8 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -77,7 +77,8 @@ public class CubeBuildJob extends SparkApplication {
private CubeManager cubeManager;
private CubeInstance cubeInstance;
private BuildLayoutWithUpdate buildLayoutWithUpdate;
- private Map<Long, Short> cuboidShardNum = Maps.newHashMap();
+ private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
+ private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
public static void main(String[] args) {
CubeBuildJob cubeBuildJob = new CubeBuildJob();
cubeBuildJob.execute(args);
@@ -217,7 +218,10 @@ public class CubeBuildJob extends SparkApplication {
cuboidsNumInLayer += toBuildCuboids.size();
Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty.");
Dataset<Row> parentDS = info.getParentDS();
- long parentDSCnt = parentDS.count();
+ // record the source count of flat table
+ if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) {
+ cuboidsRowCount.putIfAbsent(info.getLayoutId(), parentDS.count());
+ }
for (LayoutEntity index : toBuildCuboids) {
Preconditions.checkNotNull(parentDS, "Parent dataset is null when building.");
@@ -229,8 +233,7 @@ public class CubeBuildJob extends SparkApplication {
@Override
public LayoutEntity build() throws IOException {
- return buildCuboid(seg, index, parentDS, st, info.getLayoutId(),
- parentDSCnt);
+ return buildCuboid(seg, index, parentDS, st, info.getLayoutId());
}
}, config);
allIndexesInCurrentLayer.add(index);
@@ -292,7 +295,7 @@ public class CubeBuildJob extends SparkApplication {
}
private LayoutEntity buildCuboid(SegmentInfo seg, LayoutEntity cuboid, Dataset<Row> parent,
- SpanningTree spanningTree, long parentId, long parentDSCnt) throws IOException {
+ SpanningTree spanningTree, long parentId) throws IOException {
String parentName = String.valueOf(parentId);
if (parentId == ParentSourceChooser.FLAT_TABLE_FLAG()) {
parentName = "flat table";
@@ -308,7 +311,7 @@ public class CubeBuildJob extends SparkApplication {
Set<Integer> orderedDims = layoutEntity.getOrderedDimensions().keySet();
Dataset<Row> afterSort = afterPrj.select(NSparkCubingUtil.getColumns(orderedDims))
.sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims));
- saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt);
+ saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
} else {
Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, dimIndexes, cuboid.getOrderedMeasures(),
spanningTree, false);
@@ -320,7 +323,7 @@ public class CubeBuildJob extends SparkApplication {
.select(NSparkCubingUtil.getColumns(rowKeys, layoutEntity.getOrderedMeasures().keySet()))
.sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys));
- saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt);
+ saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
}
ss.sparkContext().setJobDescription(null);
logger.info("Finished Build index :{}, in segment:{}", cuboid.getId(), seg.id());
@@ -328,7 +331,7 @@ public class CubeBuildJob extends SparkApplication {
}
private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo seg, LayoutEntity layout,
- long parentDSCnt) throws IOException {
+ long parentId) throws IOException {
long layoutId = layout.getId();
// for spark metrics
@@ -349,8 +352,11 @@ public class CubeBuildJob extends SparkApplication {
if (rowCount == -1) {
infos.recordAbnormalLayouts(layoutId, "'Job metrics seems null, use count() to collect cuboid rows.'");
logger.debug("Can not get cuboid row cnt, use count() to collect cuboid rows.");
- layout.setRows(dataset.count());
- layout.setSourceRows(parentDSCnt);
+ long cuboidRowCnt = dataset.count();
+ layout.setRows(cuboidRowCnt);
+ // record the row count of cuboid
+ cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
+ layout.setSourceRows(cuboidsRowCount.get(parentId));
} else {
layout.setRows(rowCount);
layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));