You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/11/21 09:25:31 UTC

[GitHub] vvysotskyi commented on a change in pull request #1548: DRILL-6857: Read only required row groups in a file when limit push down is applied

vvysotskyi commented on a change in pull request #1548: DRILL-6857: Read only required row groups in a file when limit push down is applied
URL: https://github.com/apache/drill/pull/1548#discussion_r235305267
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 ##########
 @@ -330,29 +330,47 @@ public GroupScan applyLimit(int maxRecords) {
     maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.
     // further optimization : minimize # of files chosen, or the affinity of files chosen.
 
+    if (parquetGroupScanStatistics.getRowCount() <= maxRecords) {
+      logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].",
+        parquetGroupScanStatistics.getRowCount(), maxRecords);
+      return null;
+    }
+
     // Calculate number of rowGroups to read based on maxRecords and update
     // number of records to read for each of those rowGroups.
-    int index = updateRowGroupInfo(maxRecords);
-
-    Set<String> filePaths = rowGroupInfos.subList(0, index).stream()
-        .map(ReadEntryWithPath::getPath)
-        .collect(Collectors.toSet()); // HashSet keeps a filePath unique.
+    List<RowGroupInfo> qualifiedRowGroups = new ArrayList<>(rowGroupInfos.size());
+    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique.
+    int currentRowCount = 0;
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+      long rowCount = rowGroupInfo.getRowCount();
+      if (currentRowCount + rowCount <= maxRecords) {
+        currentRowCount += rowCount;
+        rowGroupInfo.setNumRecordsToRead(rowCount);
+        qualifiedRowGroups.add(rowGroupInfo);
+        qualifiedFilePath.add(rowGroupInfo.getPath());
+        continue;
+      } else if (currentRowCount < maxRecords) {
+        rowGroupInfo.setNumRecordsToRead(maxRecords - currentRowCount);
+        qualifiedRowGroups.add(rowGroupInfo);
+        qualifiedFilePath.add(rowGroupInfo.getPath());
+      }
+      break;
+    }
 
-    // If there is no change in fileSet, no need to create new groupScan.
-    if (filePaths.size() == fileSet.size() ) {
-      // There is no reduction of rowGroups. Return the original groupScan.
-      logger.debug("applyLimit() does not apply!");
+    if (rowGroupInfos.size() == qualifiedRowGroups.size()) {
+      logger.debug("limit push down does not apply, since number of row groups was not reduced.");
       return null;
     }
 
-    logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), filePaths.size());
+    logger.debug("applyLimit() reduce parquet row groups # from {} to {}.", rowGroupInfos.size(), qualifiedRowGroups.size());
 
     try {
-      AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths);
-      newScan.updateRowGroupInfo(maxRecords);
+      AbstractParquetGroupScan newScan = cloneWithFileSelection(qualifiedFilePath);
+      newScan.rowGroupInfos = qualifiedRowGroups;
+      newScan.parquetGroupScanStatistics.collect(newScan.rowGroupInfos, newScan.parquetTableMetadata);
 
 Review comment:
   Looks like `parquetGroupScanStatistics` was filled during the `newScan` creation. Should we call `parquetGroupScanStatistics.resetHolders()` before this line to avoid mixing previous and the new row group info?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services