You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2021/04/09 08:02:30 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4903 cache parent
datasource to accelerate next layer's cuboid building
This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new d554c76 KYLIN-4903 cache parent datasource to accelerate next layer's cuboid building
d554c76 is described below
commit d554c762838ddac4bf5e38d294e049788e6b8423
Author: zhengshengjun <sh...@sina.com>
AuthorDate: Thu Mar 4 16:42:38 2021 +0800
KYLIN-4903 cache parent datasource to accelerate next layer's cuboid building
---
.../org/apache/kylin/common/KylinConfigBase.java | 8 +++
.../engine/spark/job/BuildLayoutWithUpdate.java | 61 ++++++++++++++++++++++
.../kylin/engine/spark/job/CubeBuildJob.java | 16 ++++--
.../kylin/engine/spark/job/CubeMergeJob.java | 8 ++-
4 files changed, 89 insertions(+), 4 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ed9a243..34f8ce9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3096,4 +3096,12 @@ public abstract class KylinConfigBase implements Serializable {
public String getKerberosPrincipal() {
return getOptional("kylin.kerberos.principal");
}
+
+ public String getParentDatasetStorageLevel() {
+ return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
+ }
+
+ public int getMaxParentDatasetPersistCount() {
+ return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
+ }
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java
index b07f848..91024b9 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.spark.job;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@@ -26,10 +27,17 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +46,48 @@ public class BuildLayoutWithUpdate {
private ExecutorService pool = Executors.newCachedThreadPool();
private CompletionService<JobResult> completionService = new ExecutorCompletionService<>(pool);
private int currentLayoutsNum = 0;
+ private Map<Long, AtomicLong> toBuildCuboidSize = new ConcurrentHashMap<>();
+ private Semaphore semaphore;
+ private Map<Long, Dataset<Row>> layout2DataSet = new ConcurrentHashMap<>();
+ private StorageLevel storageLevel;
+ private boolean persistParentDataset;
+
+ public BuildLayoutWithUpdate(KylinConfig kylinConfig) {
+ this.storageLevel = StorageLevel.fromString(kylinConfig.getParentDatasetStorageLevel());
+ this.persistParentDataset = !storageLevel.equals(StorageLevel.NONE());
+ if (this.persistParentDataset) {
+ if (kylinConfig.getMaxParentDatasetPersistCount() < 1) {
+ throw new IllegalArgumentException("max parent dataset persist count should be larger than 1");
+ }
+ this.semaphore = new Semaphore(kylinConfig.getMaxParentDatasetPersistCount());
+ }
+ }
+
+ public void cacheAndRegister(long layoutId, Dataset<Row> dataset) throws InterruptedException{
+ if (!persistParentDataset) {
+ return;
+ }
+ logger.info("persist dataset of layout: {}", layoutId);
+ semaphore.acquire();
+ layout2DataSet.put(layoutId, dataset);
+ dataset.persist(storageLevel);
+ }
public void submit(JobEntity job, KylinConfig config) {
+
+ //if job's BuildSourceInfo is empty, it means this is a merge job, no parent dataset to persist
+ if (persistParentDataset && job.getBuildSourceInfo() != null && job.getBuildSourceInfo().getToBuildCuboids().size() > 1) {
+ //when reuse parent dataset is enabled, ensure parent dataset is registered
+ if(!layout2DataSet.containsKey(job.getBuildSourceInfo().getLayoutId())){
+ logger.error("persist parent dataset is enabled, but parent dataset not registered");
+ throw new RuntimeException("parent dataset not registered");
+ }
+ if (!toBuildCuboidSize.containsKey(job.getBuildSourceInfo().getLayoutId())) {
+ toBuildCuboidSize.put(job.getBuildSourceInfo().getLayoutId(),
+ new AtomicLong(job.getBuildSourceInfo().getToBuildCuboids().size()));
+ }
+ }
+
completionService.submit(new Callable<JobResult>() {
@Override
public JobResult call() throws Exception {
@@ -52,6 +100,17 @@ public class BuildLayoutWithUpdate {
} catch (Throwable t) {
logger.error("Error occurred when run " + job.getName(), t);
throwable = t;
+ } finally {
+ //unpersist parent dataset
+ if (persistParentDataset && job.getBuildSourceInfo() != null && job.getBuildSourceInfo().getToBuildCuboids().size() > 1) {
+ long remain = toBuildCuboidSize.get(job.getBuildSourceInfo().getLayoutId()).decrementAndGet();
+ if (remain == 0) {
+ toBuildCuboidSize.remove(job.getBuildSourceInfo().getLayoutId());
+ layout2DataSet.get(job.getBuildSourceInfo().getLayoutId()).unpersist();
+ logger.info("dataset of layout: {} released", job.getBuildSourceInfo().getLayoutId());
+ semaphore.release();
+ }
+ }
}
return new JobResult(dataLayouts, throwable);
}
@@ -115,5 +174,7 @@ public class BuildLayoutWithUpdate {
public abstract String getName();
public abstract LayoutEntity build() throws IOException;
+
+ public abstract NBuildSourceInfo getBuildSourceInfo();
}
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index b463dad..efce341 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -157,7 +157,7 @@ public class CubeBuildJob extends SparkApplication {
logger.info("Triggered cube planner phase one .");
}
- buildLayoutWithUpdate = new BuildLayoutWithUpdate();
+ buildLayoutWithUpdate = new BuildLayoutWithUpdate(config);
List<String> persistedFlatTable = new ArrayList<>();
List<String> persistedViewFactTable = new ArrayList<>();
Path shareDir = config.getJobTmpShareDir(project, jobId);
@@ -297,7 +297,7 @@ public class CubeBuildJob extends SparkApplication {
}
}
- private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, SpanningTree st) {
+ private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, SpanningTree st) throws InterruptedException{
List<NBuildSourceInfo> theFirstLevelBuildInfos = buildLayer(buildSourceInfos, seg, st);
LinkedList<List<NBuildSourceInfo>> queue = new LinkedList<>();
@@ -318,7 +318,7 @@ public class CubeBuildJob extends SparkApplication {
// build current layer and return the next layer to be built.
private List<NBuildSourceInfo> buildLayer(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg,
- SpanningTree st) {
+ SpanningTree st) throws InterruptedException{
int cuboidsNumInLayer = 0;
// build current layer
@@ -330,6 +330,11 @@ public class CubeBuildJob extends SparkApplication {
cuboidsNumInLayer += toBuildCuboids.size();
Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty.");
Dataset<Row> parentDS = info.getParentDS();
+
+ if (toBuildCuboids.size() > 1) {
+ buildLayoutWithUpdate.cacheAndRegister(info.getLayoutId(), parentDS);
+ }
+
// record the source count of flat table
if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) {
cuboidsRowCount.putIfAbsent(info.getLayoutId(), parentDS.count());
@@ -347,6 +352,11 @@ public class CubeBuildJob extends SparkApplication {
public LayoutEntity build() throws IOException {
return buildCuboid(seg, index, parentDS, st, info.getLayoutId());
}
+
+ @Override
+ public NBuildSourceInfo getBuildSourceInfo() {
+ return info;
+ }
}, config);
allIndexesInCurrentLayer.add(index);
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index 12e939d..83bc601 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -28,6 +28,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
@@ -67,7 +68,7 @@ public class CubeMergeJob extends SparkApplication {
@Override
protected void doExecute() throws Exception {
- buildLayoutWithUpdate = new BuildLayoutWithUpdate();
+ buildLayoutWithUpdate = new BuildLayoutWithUpdate(config);
String cubeId = getParam(MetadataConstants.P_CUBE_ID);
String newSegmentId = getParam(MetadataConstants.P_SEGMENT_IDS);
final CubeManager cubeManager = CubeManager.getInstance(config);
@@ -117,6 +118,11 @@ public class CubeMergeJob extends SparkApplication {
public LayoutEntity build() throws IOException {
return saveAndUpdateCuboid(afterSort, mergedSegInfo, layout, assist);
}
+
+ @Override
+ public NBuildSourceInfo getBuildSourceInfo() {
+ return null;
+ }
}, config);
buildLayoutWithUpdate.updateLayout(mergedSegInfo, config);