You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:21:07 UTC
[kylin] 15/22: KYLIN-5319 Earlier Init Segment LayoutInfo In FilePruner
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 19232aba8f01b42360199c481ed08856f4e65522
Author: Jiale He <35...@users.noreply.github.com>
AuthorDate: Thu Sep 29 15:31:01 2022 +0800
KYLIN-5319 Earlier Init Segment LayoutInfo In FilePruner
---
.../kylin/metadata/cube/model/NDataSegment.java | 4 +-
.../kylin/metadata/cube/model/NDataflow.java | 17 +++-
.../metadata/cube/model/NDataflowManager.java | 57 ++++++++++----
.../kylin/metadata/cube/model/NDataflowTest.java | 91 ++++++++++++++++++++--
.../sql/execution/datasource/FilePruner.scala | 21 ++---
.../sql/execution/datasource/FilePrunerSuite.scala | 10 +--
6 files changed, 161 insertions(+), 39 deletions(-)
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
index 9f9c3dfec3..a3abc92b37 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
@@ -34,12 +34,12 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.model.TimeRange;
-import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.util.MultiPartitionUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -303,7 +303,7 @@ public class NDataSegment implements ISegment, Serializable {
return getLayoutInfo().isAlreadyBuilt(layoutId);
}
- private LayoutInfo getLayoutInfo() {
+ public LayoutInfo getLayoutInfo() {
if (layoutInfo == null) {
synchronized (this) {
if (layoutInfo == null) {
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
index 37936ba4c0..58e3c3abb2 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
@@ -36,9 +36,13 @@ import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.MissingRootPersistentEntity;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cube.optimization.FrequencyMap;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
@@ -48,10 +52,6 @@ import org.apache.kylin.metadata.realization.CapabilityResult;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.cube.optimization.FrequencyMap;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -585,4 +585,13 @@ public class NDataflow extends RootPersistentEntity implements Serializable, IRe
public boolean hasReadySegments() {
return isReady() && CollectionUtils.isNotEmpty(getQueryableSegments());
}
+
+ public void initAllSegLayoutInfo() {
+ getSegments().forEach(NDataSegment::getLayoutInfo);
+ }
+
+ public void initSegLayoutInfoById(Set<String> segmentIdList) {
+ getSegments(segmentIdList).forEach(NDataSegment::getLayoutInfo);
+ }
+
}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
index 53eee8b63c..f22dd1838d 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
@@ -22,8 +22,8 @@ import static java.util.stream.Collectors.groupingBy;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CHECK_INDEX_ILLEGAL;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CHECK_PARTITION_ILLEGAL;
import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CONTAINS_GAPS;
-import static org.apache.kylin.metadata.realization.RealizationStatusEnum.ONLINE;
import static org.apache.kylin.common.util.SegmentMergeStorageChecker.checkMergeSegmentThreshold;
+import static org.apache.kylin.metadata.realization.RealizationStatusEnum.ONLINE;
import java.util.ArrayList;
import java.util.Arrays;
@@ -41,28 +41,28 @@ import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.model.ManagementType;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TimeRange;
+import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker;
+import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.IRealizationProvider;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.metadata.model.ManagementType;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker;
-import org.apache.kylin.metadata.project.NProjectManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -258,10 +258,7 @@ public class NDataflowManager implements IRealizationProvider {
}
public NDataflow getDataflow(String id) {
- if (StringUtils.isEmpty(id)) {
- return null;
- }
- return crud.get(id);
+ return getDataflow(id, false);
}
public NDataflow getDataflowByModelAlias(String name) {
@@ -874,4 +871,38 @@ public class NDataflowManager implements IRealizationProvider {
return offlineManually || isOfflineMultiPartitionModel || isOfflineScdModel;
}
+ /**
+ * get dataflow choose whether init all Segment LayoutInfo.
+ * Segment LayoutInfo is lazy load, It can be loaded immediately if needed.
+ */
+ public NDataflow getDataflow(String id, boolean loadSegLayoutInfo) {
+ if (StringUtils.isEmpty(id)) {
+ return null;
+ }
+ NDataflow dataflow = crud.get(id);
+ if (!loadSegLayoutInfo) {
+ return dataflow;
+ }
+ dataflow.initAllSegLayoutInfo();
+ return dataflow;
+ }
+
+ /**
+ * get dataflow and init specified Segment LayoutInfo.
+ */
+ public NDataflow getDataflow(String id, Set<String> segmentIds) {
+ if (StringUtils.isEmpty(id)) {
+ return null;
+ }
+ NDataflow dataflow = getDataflow(id, false);
+ if (CollectionUtils.isEmpty(segmentIds)) {
+ return dataflow;
+ }
+ if (Objects.isNull(dataflow)) {
+ return null;
+ }
+ dataflow.initSegLayoutInfoById(segmentIds);
+ return dataflow;
+ }
+
}
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java
index f962c863a7..983d70c94b 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java
@@ -20,19 +20,22 @@ package org.apache.kylin.metadata.cube.model;
import java.io.IOException;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+import io.kyligence.kap.guava20.shaded.common.collect.Sets;
import lombok.val;
import lombok.var;
public class NDataflowTest extends NLocalFileMetadataTestCase {
- private String projectDefault = "default";
+ private final String projectDefault = "default";
+ private final String projectStreaming = "streaming_test";
@Before
public void setUp() throws Exception {
@@ -76,9 +79,8 @@ public class NDataflowTest extends NLocalFileMetadataTestCase {
Assert.assertEquals(indexPlanConfig.base(), config.base());
Assert.assertEquals(2, config.getExtendedOverrides().size());
- indexPlanManager.updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa", copyForWrite -> {
- copyForWrite.getOverrideProps().put("test", "test");
- });
+ indexPlanManager.updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa",
+ copyForWrite -> copyForWrite.getOverrideProps().put("test", "test"));
config = df.getConfig();
Assert.assertEquals(indexPlanConfig.base(), config.base());
@@ -103,7 +105,7 @@ public class NDataflowTest extends NLocalFileMetadataTestCase {
@Test
public void testCollectPrecalculationResource_Streaming() {
- val dsMgr = NDataflowManager.getInstance(getTestConfig(), "streaming_test");
+ val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
val strings = df.collectPrecalculationResource();
Assert.assertEquals(7, strings.size());
@@ -122,4 +124,81 @@ public class NDataflowTest extends NLocalFileMetadataTestCase {
Assert.assertTrue(
strings.stream().anyMatch(path -> path.startsWith("/streaming_test/kafka/DEFAULT.SSB_STREAMING.json")));
}
+
+ @Test
+ public void testGetDataflow() {
+ val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
+ {
+ val df = dsMgr.getDataflow(null);
+ Assert.assertNull(df);
+ }
+
+ {
+ val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+ Assert.assertNotNull(df);
+ }
+
+ {
+ val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d-AAA", Sets.newHashSet("1"));
+ Assert.assertNull(df);
+ }
+
+ {
+ val df = dsMgr.getDataflow(null, Sets.newHashSet("1"));
+ Assert.assertNull(df);
+ }
+ }
+
+ @Test
+ public void testLazyLoadSegmentDetail() {
+ val fieldName = "layoutInfo";
+ val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
+ val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+
+ df.getSegments().forEach(segment -> {
+ // lazy init Segment LayoutInfo, it is null
+ Object layoutInfoBefore = ReflectionTestUtils.getField(segment, fieldName);
+ Assert.assertNull(layoutInfoBefore);
+
+ // init Segment LayoutInfo, it is not null
+ segment.getLayoutInfo();
+ Object layoutInfoAfter = ReflectionTestUtils.getField(segment, fieldName);
+ Assert.assertNotNull(layoutInfoAfter);
+ });
+ }
+
+ @Test
+ public void testLoadSegmentDetail() {
+ val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
+ // init Segment LayoutInfo right now
+ val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d", true);
+ df.getSegments().forEach(segment -> {
+ val layoutInfoAfter = ReflectionTestUtils.getField(segment, "layoutInfo");
+ Assert.assertNotNull(layoutInfoAfter);
+ });
+ }
+
+ @Test
+ public void testLoadSpecifiedSegmentDetail() {
+ val dataflowId = "4965c827-fbb4-4ea1-a744-3f341a3b030d";
+ val segmentId = "3e560d22-b749-48c3-9f64-d4230207f120";
+ val fieldName = "layoutInfo";
+
+ val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
+ {
+ val df = dsMgr.getDataflow(dataflowId, Sets.newHashSet());
+ val segment = df.getSegment(segmentId);
+ val layoutInfo = ReflectionTestUtils.getField(segment, fieldName);
+ Assert.assertNull(layoutInfo);
+ }
+
+ {
+ // init Specified Segment LayoutInfo
+ val df = dsMgr.getDataflow(dataflowId, Sets.newHashSet(segmentId));
+ val segmentAfter = df.getSegment(segmentId);
+ val layoutInfo = ReflectionTestUtils.getField(segmentAfter, fieldName);
+ Assert.assertNotNull(layoutInfo);
+ }
+ }
+
}
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 9937f3628b..8e8c046410 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.execution.datasource
+import io.kyligence.kap.guava20.shaded.common.collect.Sets
+
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.kylin.common.exception.TargetSegmentNotFoundException
import org.apache.kylin.common.util.{DateFormat, HadoopUtil}
@@ -39,6 +41,7 @@ import org.apache.spark.util.collection.BitSet
import java.sql.{Date, Timestamp}
import java.util
import scala.collection.JavaConverters._
+import scala.collection.mutable
case class SegmentDirectory(segmentID: String, partitions: List[Long], files: Seq[FileStatus])
@@ -79,9 +82,11 @@ class FilePruner(val session: SparkSession,
private val dataflow: NDataflow = {
val dataflowId = options.getOrElse("dataflowId", sys.error("dataflowId option is required"))
val prj = options.getOrElse("project", sys.error("project option is required"))
+ val prunedSegmentIds = Sets.newHashSet(prunedSegmentDirs.map(_.segmentID).asJavaCollection)
val dfMgr = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv, prj)
- val dataflow = dfMgr.getDataflow(dataflowId)
- FilePruner.checkSegmentStatus(prunedSegmentDirs, dataflow)
+ // init pruned Segment LayoutInfo immediately
+ val dataflow = dfMgr.getDataflow(dataflowId, prunedSegmentIds)
+ FilePruner.checkSegmentStatus(prunedSegmentIds, dataflow)
dataflow
}
@@ -546,14 +551,12 @@ object FilePruner {
}
}
- def checkSegmentStatus(segDirs: Seq[SegmentDirectory], dataflow: NDataflow): Unit = {
+ def checkSegmentStatus(prunedSegmentIds: util.HashSet[String], dataflow: NDataflow): Unit = {
// check whether each segment id corresponds to the segment in NDataflow
- val candidateSegIds = new util.HashSet[String]
- segDirs.foreach(seg => candidateSegIds.add(seg.segmentID))
- val filterSegmentIds = dataflow.getSegments(candidateSegIds).asScala.map(e => e.getId).toSet
- if(candidateSegIds.size != filterSegmentIds.size) {
- val missSegId = new StringBuilder
- candidateSegIds.asScala.foreach(e => {
+ val filterSegmentIds = dataflow.getSegments(prunedSegmentIds).asScala.map(e => e.getId).toSet
+ if (prunedSegmentIds.size != filterSegmentIds.size) {
+ val missSegId = new mutable.StringBuilder
+ prunedSegmentIds.asScala.foreach(e => {
if (!filterSegmentIds.contains(e)) {
missSegId.append(e).append(";")
}
diff --git a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
index 1bc6191e65..240bd85b86 100644
--- a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
+++ b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
@@ -18,11 +18,14 @@
package org.apache.spark.sql.execution.datasource
+import io.kyligence.kap.guava20.shaded.common.collect.Sets
import org.apache.kylin.common.exception.TargetSegmentNotFoundException
import org.apache.kylin.metadata.cube.model.{NDataSegment, NDataflow}
import org.apache.kylin.metadata.model.{SegmentStatusEnum, Segments}
import org.apache.spark.sql.common.SparderBaseFunSuite
+import scala.collection.JavaConverters._
+
class FilePrunerSuite extends SparderBaseFunSuite {
test("KE-37730: test check segment status") {
@@ -37,12 +40,9 @@ class FilePrunerSuite extends SparderBaseFunSuite {
val segDir1 = SegmentDirectory("1", List.empty[Long], null)
val segDir2 = SegmentDirectory("2", List.empty[Long], null)
- val segDirSeq1 = Seq(segDir1)
- FilePruner.checkSegmentStatus(segDirSeq1, mockDataFlow)
-
- val segDirSeq2 = Seq(segDir1, segDir2)
+ FilePruner.checkSegmentStatus(Sets.newHashSet(Seq(segDir1).map(_.segmentID).asJavaCollection), mockDataFlow)
val catchEx = intercept[TargetSegmentNotFoundException] {
- FilePruner.checkSegmentStatus(segDirSeq2, mockDataFlow)
+ FilePruner.checkSegmentStatus(Sets.newHashSet(Seq(segDir1, segDir2).map(_.segmentID).asJavaCollection), mockDataFlow)
}
assert(catchEx.getMessage.equals("Cannot find target segment, and missing segment id: 2;"))
}