You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:21:07 UTC

[kylin] 15/22: KYLIN-5319 Earlier Init Segment LayoutInfo In FilePruner

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 19232aba8f01b42360199c481ed08856f4e65522
Author: Jiale He <35...@users.noreply.github.com>
AuthorDate: Thu Sep 29 15:31:01 2022 +0800

    KYLIN-5319 Earlier Init Segment LayoutInfo In FilePruner
---
 .../kylin/metadata/cube/model/NDataSegment.java    |  4 +-
 .../kylin/metadata/cube/model/NDataflow.java       | 17 +++-
 .../metadata/cube/model/NDataflowManager.java      | 57 ++++++++++----
 .../kylin/metadata/cube/model/NDataflowTest.java   | 91 ++++++++++++++++++++--
 .../sql/execution/datasource/FilePruner.scala      | 21 ++---
 .../sql/execution/datasource/FilePrunerSuite.scala | 10 +--
 6 files changed, 161 insertions(+), 39 deletions(-)

diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
index 9f9c3dfec3..a3abc92b37 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java
@@ -34,12 +34,12 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.TimeRange;
-import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.util.MultiPartitionUtil;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -303,7 +303,7 @@ public class NDataSegment implements ISegment, Serializable {
         return getLayoutInfo().isAlreadyBuilt(layoutId);
     }
 
-    private LayoutInfo getLayoutInfo() {
+    public LayoutInfo getLayoutInfo() {
         if (layoutInfo == null) {
             synchronized (this) {
                 if (layoutInfo == null) {
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
index 37936ba4c0..58e3c3abb2 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
@@ -36,9 +36,13 @@ import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.MissingRootPersistentEntity;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cube.optimization.FrequencyMap;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
@@ -48,10 +52,6 @@ import org.apache.kylin.metadata.realization.CapabilityResult;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.cube.optimization.FrequencyMap;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -585,4 +585,13 @@ public class NDataflow extends RootPersistentEntity implements Serializable, IRe
     public boolean hasReadySegments() {
         return isReady() && CollectionUtils.isNotEmpty(getQueryableSegments());
     }
+
+    public void initAllSegLayoutInfo() {
+        getSegments().forEach(NDataSegment::getLayoutInfo);
+    }
+
+    public void initSegLayoutInfoById(Set<String> segmentIdList) {
+        getSegments(segmentIdList).forEach(NDataSegment::getLayoutInfo);
+    }
+
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
index 53eee8b63c..f22dd1838d 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowManager.java
@@ -22,8 +22,8 @@ import static java.util.stream.Collectors.groupingBy;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CHECK_INDEX_ILLEGAL;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CHECK_PARTITION_ILLEGAL;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CONTAINS_GAPS;
-import static org.apache.kylin.metadata.realization.RealizationStatusEnum.ONLINE;
 import static org.apache.kylin.common.util.SegmentMergeStorageChecker.checkMergeSegmentThreshold;
+import static org.apache.kylin.metadata.realization.RealizationStatusEnum.ONLINE;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,28 +41,28 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.model.ManagementType;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TimeRange;
+import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationProvider;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.metadata.model.ManagementType;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker;
-import org.apache.kylin.metadata.project.NProjectManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -258,10 +258,7 @@ public class NDataflowManager implements IRealizationProvider {
     }
 
     public NDataflow getDataflow(String id) {
-        if (StringUtils.isEmpty(id)) {
-            return null;
-        }
-        return crud.get(id);
+        return getDataflow(id, false);
     }
 
     public NDataflow getDataflowByModelAlias(String name) {
@@ -874,4 +871,38 @@ public class NDataflowManager implements IRealizationProvider {
         return offlineManually || isOfflineMultiPartitionModel || isOfflineScdModel;
     }
 
+    /**
+     * get dataflow choose whether init all Segment LayoutInfo.
+     * Segment LayoutInfo is lazy load, It can be loaded immediately if needed.
+     */
+    public NDataflow getDataflow(String id, boolean loadSegLayoutInfo) {
+        if (StringUtils.isEmpty(id)) {
+            return null;
+        }
+        NDataflow dataflow = crud.get(id);
+        if (!loadSegLayoutInfo) {
+            return dataflow;
+        }
+        dataflow.initAllSegLayoutInfo();
+        return dataflow;
+    }
+
+    /**
+     * get dataflow and init specified Segment LayoutInfo.
+     */
+    public NDataflow getDataflow(String id, Set<String> segmentIds) {
+        if (StringUtils.isEmpty(id)) {
+            return null;
+        }
+        NDataflow dataflow = getDataflow(id, false);
+        if (CollectionUtils.isEmpty(segmentIds)) {
+            return dataflow;
+        }
+        if (Objects.isNull(dataflow)) {
+            return null;
+        }
+        dataflow.initSegLayoutInfoById(segmentIds);
+        return dataflow;
+    }
+
 }
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java
index f962c863a7..983d70c94b 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowTest.java
@@ -20,19 +20,22 @@ package org.apache.kylin.metadata.cube.model;
 
 import java.io.IOException;
 
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.springframework.test.util.ReflectionTestUtils;
 
+import io.kyligence.kap.guava20.shaded.common.collect.Sets;
 import lombok.val;
 import lombok.var;
 
 public class NDataflowTest extends NLocalFileMetadataTestCase {
-    private String projectDefault = "default";
+    private final String projectDefault = "default";
+    private final String projectStreaming = "streaming_test";
 
     @Before
     public void setUp() throws Exception {
@@ -76,9 +79,8 @@ public class NDataflowTest extends NLocalFileMetadataTestCase {
         Assert.assertEquals(indexPlanConfig.base(), config.base());
         Assert.assertEquals(2, config.getExtendedOverrides().size());
 
-        indexPlanManager.updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa", copyForWrite -> {
-            copyForWrite.getOverrideProps().put("test", "test");
-        });
+        indexPlanManager.updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa",
+                copyForWrite -> copyForWrite.getOverrideProps().put("test", "test"));
 
         config = df.getConfig();
         Assert.assertEquals(indexPlanConfig.base(), config.base());
@@ -103,7 +105,7 @@ public class NDataflowTest extends NLocalFileMetadataTestCase {
 
     @Test
     public void testCollectPrecalculationResource_Streaming() {
-        val dsMgr = NDataflowManager.getInstance(getTestConfig(), "streaming_test");
+        val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
         val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
         val strings = df.collectPrecalculationResource();
         Assert.assertEquals(7, strings.size());
@@ -122,4 +124,81 @@ public class NDataflowTest extends NLocalFileMetadataTestCase {
         Assert.assertTrue(
                 strings.stream().anyMatch(path -> path.startsWith("/streaming_test/kafka/DEFAULT.SSB_STREAMING.json")));
     }
+
+    @Test
+    public void testGetDataflow() {
+        val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
+        {
+            val df = dsMgr.getDataflow(null);
+            Assert.assertNull(df);
+        }
+
+        {
+            val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+            Assert.assertNotNull(df);
+        }
+
+        {
+            val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d-AAA", Sets.newHashSet("1"));
+            Assert.assertNull(df);
+        }
+
+        {
+            val df = dsMgr.getDataflow(null, Sets.newHashSet("1"));
+            Assert.assertNull(df);
+        }
+    }
+
+    @Test
+    public void testLazyLoadSegmentDetail() {
+        val fieldName = "layoutInfo";
+        val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
+        val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+
+        df.getSegments().forEach(segment -> {
+            // lazy init Segment LayoutInfo, it is null
+            Object layoutInfoBefore = ReflectionTestUtils.getField(segment, fieldName);
+            Assert.assertNull(layoutInfoBefore);
+
+            // init Segment LayoutInfo, it is not null
+            segment.getLayoutInfo();
+            Object layoutInfoAfter = ReflectionTestUtils.getField(segment, fieldName);
+            Assert.assertNotNull(layoutInfoAfter);
+        });
+    }
+
+    @Test
+    public void testLoadSegmentDetail() {
+        val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
+        // init Segment LayoutInfo right now
+        val df = dsMgr.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d", true);
+        df.getSegments().forEach(segment -> {
+            val layoutInfoAfter = ReflectionTestUtils.getField(segment, "layoutInfo");
+            Assert.assertNotNull(layoutInfoAfter);
+        });
+    }
+
+    @Test
+    public void testLoadSpecifiedSegmentDetail() {
+        val dataflowId = "4965c827-fbb4-4ea1-a744-3f341a3b030d";
+        val segmentId = "3e560d22-b749-48c3-9f64-d4230207f120";
+        val fieldName = "layoutInfo";
+
+        val dsMgr = NDataflowManager.getInstance(getTestConfig(), projectStreaming);
+        {
+            val df = dsMgr.getDataflow(dataflowId, Sets.newHashSet());
+            val segment = df.getSegment(segmentId);
+            val layoutInfo = ReflectionTestUtils.getField(segment, fieldName);
+            Assert.assertNull(layoutInfo);
+        }
+
+        {
+            // init Specified Segment LayoutInfo
+            val df = dsMgr.getDataflow(dataflowId, Sets.newHashSet(segmentId));
+            val segmentAfter = df.getSegment(segmentId);
+            val layoutInfo = ReflectionTestUtils.getField(segmentAfter, fieldName);
+            Assert.assertNotNull(layoutInfo);
+        }
+    }
+
 }
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 9937f3628b..8e8c046410 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -18,6 +18,8 @@
 
 package org.apache.spark.sql.execution.datasource
 
+import io.kyligence.kap.guava20.shaded.common.collect.Sets
+
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.kylin.common.exception.TargetSegmentNotFoundException
 import org.apache.kylin.common.util.{DateFormat, HadoopUtil}
@@ -39,6 +41,7 @@ import org.apache.spark.util.collection.BitSet
 import java.sql.{Date, Timestamp}
 import java.util
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 case class SegmentDirectory(segmentID: String, partitions: List[Long], files: Seq[FileStatus])
 
@@ -79,9 +82,11 @@ class FilePruner(val session: SparkSession,
   private val dataflow: NDataflow = {
     val dataflowId = options.getOrElse("dataflowId", sys.error("dataflowId option is required"))
     val prj = options.getOrElse("project", sys.error("project option is required"))
+    val prunedSegmentIds = Sets.newHashSet(prunedSegmentDirs.map(_.segmentID).asJavaCollection)
     val dfMgr = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv, prj)
-    val dataflow = dfMgr.getDataflow(dataflowId)
-    FilePruner.checkSegmentStatus(prunedSegmentDirs, dataflow)
+    // init pruned Segment LayoutInfo immediately
+    val dataflow = dfMgr.getDataflow(dataflowId, prunedSegmentIds)
+    FilePruner.checkSegmentStatus(prunedSegmentIds, dataflow)
     dataflow
   }
 
@@ -546,14 +551,12 @@ object FilePruner {
     }
   }
 
-  def checkSegmentStatus(segDirs: Seq[SegmentDirectory], dataflow: NDataflow): Unit = {
+  def checkSegmentStatus(prunedSegmentIds: util.HashSet[String], dataflow: NDataflow): Unit = {
     // check whether each segment id corresponds to the segment in NDataflow
-    val candidateSegIds = new util.HashSet[String]
-    segDirs.foreach(seg => candidateSegIds.add(seg.segmentID))
-    val filterSegmentIds = dataflow.getSegments(candidateSegIds).asScala.map(e => e.getId).toSet
-    if(candidateSegIds.size != filterSegmentIds.size) {
-      val missSegId = new StringBuilder
-      candidateSegIds.asScala.foreach(e => {
+    val filterSegmentIds = dataflow.getSegments(prunedSegmentIds).asScala.map(e => e.getId).toSet
+    if (prunedSegmentIds.size != filterSegmentIds.size) {
+      val missSegId = new mutable.StringBuilder
+      prunedSegmentIds.asScala.foreach(e => {
         if (!filterSegmentIds.contains(e)) {
           missSegId.append(e).append(";")
         }
diff --git a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
index 1bc6191e65..240bd85b86 100644
--- a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
+++ b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
@@ -18,11 +18,14 @@
 
 package org.apache.spark.sql.execution.datasource
 
+import io.kyligence.kap.guava20.shaded.common.collect.Sets
 import org.apache.kylin.common.exception.TargetSegmentNotFoundException
 import org.apache.kylin.metadata.cube.model.{NDataSegment, NDataflow}
 import org.apache.kylin.metadata.model.{SegmentStatusEnum, Segments}
 import org.apache.spark.sql.common.SparderBaseFunSuite
 
+import scala.collection.JavaConverters._
+
 class FilePrunerSuite extends SparderBaseFunSuite {
 
   test("KE-37730: test check segment status") {
@@ -37,12 +40,9 @@ class FilePrunerSuite extends SparderBaseFunSuite {
     val segDir1 = SegmentDirectory("1", List.empty[Long], null)
     val segDir2 = SegmentDirectory("2", List.empty[Long], null)
 
-    val segDirSeq1 = Seq(segDir1)
-    FilePruner.checkSegmentStatus(segDirSeq1, mockDataFlow)
-
-    val segDirSeq2 = Seq(segDir1, segDir2)
+    FilePruner.checkSegmentStatus(Sets.newHashSet(Seq(segDir1).map(_.segmentID).asJavaCollection), mockDataFlow)
     val catchEx = intercept[TargetSegmentNotFoundException] {
-      FilePruner.checkSegmentStatus(segDirSeq2, mockDataFlow)
+      FilePruner.checkSegmentStatus(Sets.newHashSet(Seq(segDir1, segDir2).map(_.segmentID).asJavaCollection), mockDataFlow)
     }
     assert(catchEx.getMessage.equals("Cannot find target segment, and missing segment id: 2;"))
   }