You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/04/17 08:17:28 UTC

[GitHub] [carbondata] QiangCai commented on a change in pull request #3701: [CARBONDATA-3770] Improve partition count star query performance

QiangCai commented on a change in pull request #3701: [CARBONDATA-3770] Improve partition count star query performance
URL: https://github.com/apache/carbondata/pull/3701#discussion_r410065804
 
 

 ##########
 File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
 ##########
 @@ -108,4 +125,145 @@ case class CarbonCountStar(
     CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
+
+  // The detail of query flow as following for pure partition count star:
+  // Step 1. check whether it is pure partition count star by filter
+  // Step 2. read tablestatus to get all valid segments, remove the segment file cache of invalid
+  // segment and expired segment
+  // Step 3. use multi-thread to read segment files which not in cache and cache index files list
+  // of each segment into memory. If its index files already exist in cache, not required to
+  // read again.
+  // Step 4. use multi-thread to prune segment and partition to get pruned index file list, which
+  // can prune most index files and reduce the files num.
+  // Step 5. read the count from pruned index file directly and cache it, get from cache if exist
+  // in the index_file <-> rowCount map.
+  private def getRowCountPurePartitionPrune: Long = {
+    var rowCount: Long = 0
+    val prunedPartitionPaths = new java.util.ArrayList[String]()
+    // Get the current partitions from table.
+    val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+    if (partitions != null) {
+      for (partition <- partitions) {
+        prunedPartitionPaths.add(partition.getLocation.toString)
+      }
+      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val validSegmentPaths = details.filter(segment =>
+        ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+          (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+          && segment.getSegmentFile != null).map(segment => segment.getSegmentFile)
+      val tableSegmentIndexes = DataMapStoreManager.getInstance().getAllSegmentIndexes(
+        carbonTable.getTableId)
+      if (!tableSegmentIndexes.isEmpty) {
+        // clear invalid cache
+        for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+          if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+            // means invalid cache
+            tableSegmentIndexes.remove(segmentFilePathInCache)
+          }
+        }
+      }
+      // init and put absent the valid cache
+      for (validSegmentPath <- validSegmentPaths) {
+        if (tableSegmentIndexes.get(validSegmentPath) == null) {
+          val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+          tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+        }
+      }
+
+      val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
 
 Review comment:
   @ajantha-bhat it tries to load the required index into memory only and avoid to load all index into cache

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services