You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2021/04/09 08:02:30 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4903 cache parent datasource to accelerate next layer's cuboid building

This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new d554c76  KYLIN-4903 cache parent datasource to accelerate next layer's cuboid building
d554c76 is described below

commit d554c762838ddac4bf5e38d294e049788e6b8423
Author: zhengshengjun <sh...@sina.com>
AuthorDate: Thu Mar 4 16:42:38 2021 +0800

    KYLIN-4903 cache parent datasource to accelerate next layer's cuboid building
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  8 +++
 .../engine/spark/job/BuildLayoutWithUpdate.java    | 61 ++++++++++++++++++++++
 .../kylin/engine/spark/job/CubeBuildJob.java       | 16 ++++--
 .../kylin/engine/spark/job/CubeMergeJob.java       |  8 ++-
 4 files changed, 89 insertions(+), 4 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ed9a243..34f8ce9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3096,4 +3096,12 @@ public abstract class KylinConfigBase implements Serializable {
     public String getKerberosPrincipal() {
         return getOptional("kylin.kerberos.principal");
     }
+
+    public String getParentDatasetStorageLevel() {
+        return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
+    }
+
+    public int getMaxParentDatasetPersistCount() {
+        return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
+    }
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java
index b07f848..91024b9 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.spark.job;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -26,10 +27,17 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
 import org.apache.kylin.engine.spark.metadata.SegmentInfo;
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,8 +46,48 @@ public class BuildLayoutWithUpdate {
     private ExecutorService pool = Executors.newCachedThreadPool();
     private CompletionService<JobResult> completionService = new ExecutorCompletionService<>(pool);
     private int currentLayoutsNum = 0;
+    private Map<Long, AtomicLong> toBuildCuboidSize = new ConcurrentHashMap<>();
+    private Semaphore semaphore;
+    private Map<Long, Dataset<Row>> layout2DataSet = new ConcurrentHashMap<>();
+    private StorageLevel storageLevel;
+    private boolean persistParentDataset;
+
+    public BuildLayoutWithUpdate(KylinConfig kylinConfig) {
+        this.storageLevel = StorageLevel.fromString(kylinConfig.getParentDatasetStorageLevel());
+        this.persistParentDataset = !storageLevel.equals(StorageLevel.NONE());
+        if (this.persistParentDataset) {
+            if (kylinConfig.getMaxParentDatasetPersistCount() < 1) {
+                throw new IllegalArgumentException("max parent dataset persist count should be larger than 1");
+            }
+            this.semaphore = new Semaphore(kylinConfig.getMaxParentDatasetPersistCount());
+        }
+    }
+
+    public void cacheAndRegister(long layoutId, Dataset<Row> dataset) throws InterruptedException{
+        if (!persistParentDataset) {
+            return;
+        }
+        logger.info("persist dataset of layout: {}", layoutId);
+        semaphore.acquire();
+        layout2DataSet.put(layoutId, dataset);
+        dataset.persist(storageLevel);
+    }
 
     public void submit(JobEntity job, KylinConfig config) {
+
+        //if job's BuildSourceInfo is empty, it means this is a merge job, no parent dataset to persist
+        if (persistParentDataset && job.getBuildSourceInfo() != null && job.getBuildSourceInfo().getToBuildCuboids().size() > 1) {
+            //when reuse parent dataset is enabled, ensure parent dataset is registered
+            if(!layout2DataSet.containsKey(job.getBuildSourceInfo().getLayoutId())){
+                logger.error("persist parent dataset is enabled, but parent dataset not registered");
+                throw new RuntimeException("parent dataset not registered");
+            }
+            if (!toBuildCuboidSize.containsKey(job.getBuildSourceInfo().getLayoutId())) {
+                toBuildCuboidSize.put(job.getBuildSourceInfo().getLayoutId(),
+                        new AtomicLong(job.getBuildSourceInfo().getToBuildCuboids().size()));
+            }
+        }
+
         completionService.submit(new Callable<JobResult>() {
             @Override
             public JobResult call() throws Exception {
@@ -52,6 +100,17 @@ public class BuildLayoutWithUpdate {
                 } catch (Throwable t) {
                     logger.error("Error occurred when run " + job.getName(), t);
                     throwable = t;
+                } finally {
+                    //unpersist parent dataset
+                    if (persistParentDataset && job.getBuildSourceInfo() != null && job.getBuildSourceInfo().getToBuildCuboids().size() > 1) {
+                        long remain = toBuildCuboidSize.get(job.getBuildSourceInfo().getLayoutId()).decrementAndGet();
+                        if (remain == 0) {
+                            toBuildCuboidSize.remove(job.getBuildSourceInfo().getLayoutId());
+                            layout2DataSet.get(job.getBuildSourceInfo().getLayoutId()).unpersist();
+                            logger.info("dataset of layout: {} released", job.getBuildSourceInfo().getLayoutId());
+                            semaphore.release();
+                        }
+                    }
                 }
                 return new JobResult(dataLayouts, throwable);
             }
@@ -115,5 +174,7 @@ public class BuildLayoutWithUpdate {
         public abstract String getName();
 
         public abstract LayoutEntity build() throws IOException;
+
+        public abstract NBuildSourceInfo getBuildSourceInfo();
     }
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index b463dad..efce341 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -157,7 +157,7 @@ public class CubeBuildJob extends SparkApplication {
                 logger.info("Triggered cube planner phase one .");
         }
 
-        buildLayoutWithUpdate = new BuildLayoutWithUpdate();
+        buildLayoutWithUpdate = new BuildLayoutWithUpdate(config);
         List<String> persistedFlatTable = new ArrayList<>();
         List<String> persistedViewFactTable = new ArrayList<>();
         Path shareDir = config.getJobTmpShareDir(project, jobId);
@@ -297,7 +297,7 @@ public class CubeBuildJob extends SparkApplication {
         }
     }
 
-    private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, SpanningTree st) {
+    private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, SpanningTree st) throws InterruptedException{
 
         List<NBuildSourceInfo> theFirstLevelBuildInfos = buildLayer(buildSourceInfos, seg, st);
         LinkedList<List<NBuildSourceInfo>> queue = new LinkedList<>();
@@ -318,7 +318,7 @@ public class CubeBuildJob extends SparkApplication {
 
     // build current layer and return the next layer to be built.
     private List<NBuildSourceInfo> buildLayer(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg,
-                                              SpanningTree st) {
+                                              SpanningTree st) throws InterruptedException{
         int cuboidsNumInLayer = 0;
 
         // build current layer
@@ -330,6 +330,11 @@ public class CubeBuildJob extends SparkApplication {
             cuboidsNumInLayer += toBuildCuboids.size();
             Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty.");
             Dataset<Row> parentDS = info.getParentDS();
+
+            if (toBuildCuboids.size() > 1) {
+                buildLayoutWithUpdate.cacheAndRegister(info.getLayoutId(), parentDS);
+            }
+
             // record the source count of flat table
             if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) {
                 cuboidsRowCount.putIfAbsent(info.getLayoutId(), parentDS.count());
@@ -347,6 +352,11 @@ public class CubeBuildJob extends SparkApplication {
                     public LayoutEntity build() throws IOException {
                         return buildCuboid(seg, index, parentDS, st, info.getLayoutId());
                     }
+
+                    @Override
+                    public NBuildSourceInfo getBuildSourceInfo() {
+                        return info;
+                    }
                 }, config);
                 allIndexesInCurrentLayer.add(index);
             }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index 12e939d..83bc601 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -28,6 +28,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
 import org.apache.kylin.engine.spark.metadata.SegmentInfo;
 import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
 import org.apache.kylin.engine.spark.metadata.cube.PathManager;
@@ -67,7 +68,7 @@ public class CubeMergeJob extends SparkApplication {
 
     @Override
     protected void doExecute() throws Exception {
-        buildLayoutWithUpdate = new BuildLayoutWithUpdate();
+        buildLayoutWithUpdate = new BuildLayoutWithUpdate(config);
         String cubeId = getParam(MetadataConstants.P_CUBE_ID);
         String newSegmentId = getParam(MetadataConstants.P_SEGMENT_IDS);
         final CubeManager cubeManager = CubeManager.getInstance(config);
@@ -117,6 +118,11 @@ public class CubeMergeJob extends SparkApplication {
                 public LayoutEntity build() throws IOException {
                     return saveAndUpdateCuboid(afterSort, mergedSegInfo, layout, assist);
                 }
+
+                @Override
+                public NBuildSourceInfo getBuildSourceInfo() {
+                    return null;
+                }
             }, config);
 
             buildLayoutWithUpdate.updateLayout(mergedSegInfo, config);