You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by GitBox <gi...@apache.org> on 2021/04/07 02:43:10 UTC

[GitHub] [kylin] zhengshengjun commented on a change in pull request #1583: KYLIN-4903 cache parent datasource to accelerate next layer's cuboid building

zhengshengjun commented on a change in pull request #1583:
URL: https://github.com/apache/kylin/pull/1583#discussion_r608301752



##########
File path: kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
##########
@@ -330,6 +330,11 @@ private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo se
             cuboidsNumInLayer += toBuildCuboids.size();
             Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty.");
             Dataset<Row> parentDS = info.getParentDS();
+
+            if (toBuildCuboids.size() >= 1) {

Review comment:
       you are right, fixed.

##########
File path: kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java
##########
@@ -38,8 +46,45 @@
     private ExecutorService pool = Executors.newCachedThreadPool();
     private CompletionService<JobResult> completionService = new ExecutorCompletionService<>(pool);
     private int currentLayoutsNum = 0;
+    private Map<Long, AtomicLong> toBuildCuboidSize = new ConcurrentHashMap<>();
+    private Semaphore semaphore;
+    private Map<Long, Dataset<Row>> layout2DataSet = new ConcurrentHashMap<>();
+    private KylinConfig kylinConfig;
+    private boolean persistParentDataset;
+
+    public BuildLayoutWithUpdate(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+        this.persistParentDataset = !kylinConfig.getParentDatasetStorageLevel().equals(StorageLevel.NONE());

Review comment:
       It's my fault ,thanks for finding it ~

##########
File path: kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java
##########
@@ -38,8 +46,45 @@
     private ExecutorService pool = Executors.newCachedThreadPool();
     private CompletionService<JobResult> completionService = new ExecutorCompletionService<>(pool);
     private int currentLayoutsNum = 0;
+    private Map<Long, AtomicLong> toBuildCuboidSize = new ConcurrentHashMap<>();
+    private Semaphore semaphore;
+    private Map<Long, Dataset<Row>> layout2DataSet = new ConcurrentHashMap<>();
+    private KylinConfig kylinConfig;
+    private boolean persistParentDataset;
+
+    public BuildLayoutWithUpdate(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+        this.persistParentDataset = !kylinConfig.getParentDatasetStorageLevel().equals(StorageLevel.NONE());
+        if (this.persistParentDataset) {
+            this.semaphore = new Semaphore(kylinConfig.getMaxParentDatasetPersistCount());
+        }
+    }
+
+    public void cacheAndRegister(long layoutId, Dataset<Row> dataset) throws InterruptedException{
+        if (!persistParentDataset) {
+            return;
+        }
+        logger.info("persist dataset of layout: {}", layoutId);
+        semaphore.acquire();
+        layout2DataSet.put(layoutId, dataset);
+        dataset.persist(StorageLevel.fromString(kylinConfig.getParentDatasetStorageLevel()));

Review comment:
       good suggestion 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org