You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2019/03/01 12:42:13 UTC

[GitHub] KazydubB commented on a change in pull request #1640: DRILL-7038: Queries on partitioned columns scan the entire datasets

KazydubB commented on a change in pull request #1640: DRILL-7038: Queries on partitioned columns scan the entire datasets
URL: https://github.com/apache/drill/pull/1640#discussion_r261588501
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
 ##########
 @@ -550,4 +567,211 @@ private static void setPruneStatus(MetadataContext metaContext, PruneStatus prun
     }
   }
 
+  private static class PruneFilesOnScanRule extends PruneScanRule {
+
+    private final Pattern dirPattern;
+
+    private PruneFilesOnScanRule(OptimizerRulesContext optimizerRulesContext) {
+      super(RelOptHelper.some(Aggregate.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(TableScan.class)),
+          "PruneFilesOnScanRule:Prune_On_Scan", optimizerRulesContext);
+      String partitionColumnLabel = optimizerRulesContext.getPlannerSettings().getFsPartitionColumnLabel();
+      dirPattern = Pattern.compile(partitionColumnLabel + "\\d+");
+    }
+
+    @Override
+    public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+      return new FileSystemPartitionDescriptor(settings, scanRel);
+    }
+
+    // Checks if query references directory columns only and has DISTINCT or GROUP BY operation
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      Aggregate aggregate = call.rel(0);
+      TableScan scan = call.rel(1);
+
+      if (!isQualifiedFilePruning(scan)
+          || scan.getRowType().getFieldCount() != aggregate.getRowType().getFieldCount()) {
+        return false;
+      }
+
+      List<String> fieldNames = scan.getRowType().getFieldNames();
+      // Check if select contains partition columns (dir0, dir1, dir2,..., dirN) only
+      for (String field : fieldNames) {
+        if (!dirPattern.matcher(field).matches()) {
+          return false;
+        }
+      }
+
+      return scan.isDistinct() || aggregate.getGroupCount() > 0;
+    }
+
+    /*
+      Transforms Scan node to DrillValuesRel node to avoid unnecessary scanning of selected files.
+      If cache metadata directory file exists, directory columns will be read from it,
+      otherwise directories will be gathered from selection (PartitionLocations).
+      DrillValuesRel will contain gathered constant literals.
+
+      For example, plan for "select dir0, dir1 from `t` group by 1, 2", where table `t` has directory structure year/quarter
+
+      00-00    Screen
+      00-01      Project(dir0=[$0], dir1=[$1])
+      00-02        HashAgg(group=[{0, 1}])
+      00-03          Scan(table=[[t]], groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/path/t/1996/Q4/orders_96_q4.parquet],
+        ReadEntryWithPath [path=file:/path/t/1996/Q1/file_96_q1.parquet], ReadEntryWithPath [path=file:/path/t/1996/Q3/file_96_q3.parquet],
+        ReadEntryWithPath [path=file:/path/t/1996/Q2/file_96_q2.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q4/file_94_q4.parquet],
+        ReadEntryWithPath [path=file:/path/t/1994/Q1/file_94_q1.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q3/file_94_q3.parquet],
+        ReadEntryWithPath [path=file:/path/t/1994/Q2/file_94_q2.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q4/file_95_q4.parquet],
+        ReadEntryWithPath [path=file:/path/t/1995/Q1/file_95_q1.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q3/file_95_q3.parquet],
+        ReadEntryWithPath [path=file:/path/t/1995/Q2/file_95_q2.parquet]], selectionRoot=file:/path/t, ..., columns=[`dir0`, `dir1`]]])
+
+      will be changed to
+
+      00-00    Screen
+      00-01      Project(dir0=[$0], dir1=[$1])
+      00-02        HashAgg(group=[{0, 1}])
+      00-03          Values(tuples=[[{ '1995', 'Q1' }, { '1994', 'Q4' }, { '1996', 'Q3' }, { '1996', 'Q2' }, { '1994', 'Q2' },
+        { '1995', 'Q4' }, { '1996', 'Q1' }, { '1995', 'Q3' }, { '1996', 'Q4' }, { '1994', 'Q3' }, { '1994', 'Q1' }, { '1995', 'Q2' }]])
+     */
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      TableScan scan = call.rel(1);
+
+      String pruningClassName = getClass().getName();
+      logger.debug("Beginning file partition pruning, pruning class: {}", pruningClassName);
+      Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
+
+      Object selection = getDrillTable(scan).getSelection();
+      MetadataContext metaContext = null;
+      FileSelection fileSelection = null;
+      if (selection instanceof FormatSelection) {
+        fileSelection = ((FormatSelection) selection).getSelection();
+        metaContext = fileSelection.getMetaContext();
+      }
+
+      PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+      PartitionDescriptor descriptor = getPartitionDescriptor(settings, scan);
+
+      List<String> fieldNames = scan.getRowType().getFieldNames();
+      List<String> values = Collections.emptyList();
+      List<Integer> indexes = new ArrayList<>(fieldNames.size());
+      for (String field : fieldNames) {
+        int index = descriptor.getPartitionHierarchyIndex(field);
+        indexes.add(index);
+      }
+
+      if (metaContext != null && metaContext.getDirectories() != null) {
+        // Dir metadata cache file exists
+        logger.debug("Using Metadata Directories cache");
+        values = getValues(fileSelection.getSelectionRoot(), metaContext.getDirectories(), indexes);
+      }
+
+      if (values.isEmpty()) {
+        logger.debug("Not using Metadata Directories cache");
+        int batchIndex = 0;
+        // Outer loop: iterate over a list of batches of PartitionLocations
+        values = new ArrayList<>();
+        for (List<PartitionLocation> partitions : descriptor) {
+          logger.debug("Evaluating file partition pruning for batch {}", batchIndex);
+
+          try {
+            values.addAll(getValues(partitions, indexes));
+          } catch (Exception e) {
+            logger.warn("Exception while trying to prune files.", e);
+            if (totalPruningTime != null) {
+              logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+            }
+
+            // continue without partition pruning
+            return;
+          }
+          batchIndex++;
+        }
+
+        if (values.isEmpty()) {
+          // No changes are required
+          return;
+        }
+      }
+
+      try {
+        // Transform Scan node to DrillValuesRel node
+        List<RelDataTypeField> typeFields = new ArrayList<>(fieldNames.size());
+        RelDataTypeFactory typeFactory = scan.getCluster().getTypeFactory();
+
+        int i = 0;
+        for (String field : fieldNames) {
+          RelDataType dataType = typeFactory.createTypeWithNullability(
+              typeFactory.createSqlType(SqlTypeName.VARCHAR, Types.MAX_VARCHAR_LENGTH), true);
+          typeFields.add(new RelDataTypeFieldImpl(field, i++, dataType));
+        }
+        RelRecordType t = new RelRecordType(scan.getRowType().getStructKind(), typeFields);
+        RelNode newInput = DrillRelFactories.LOGICAL_BUILDER.create(scan.getCluster(), null)
+            .values(t, values.toArray())
+            .build();
+
+        RelTraitSet traits = newInput.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+        newInput = new DrillValuesRel(
+            newInput.getCluster(),
+            newInput.getRowType(),
+            ((LogicalValues) newInput).getTuples(), traits
+        );
+
+        Aggregate aggregate = call.rel(0);
+        Aggregate newAggregate = aggregate.copy(
+            aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+            newInput,
+            aggregate.indicator,
+            aggregate.getGroupSet(),
+            aggregate.getGroupSets(),
+            aggregate.getAggCallList()
+        );
+        call.transformTo(newAggregate);
+      } catch (Exception e) {
+        logger.warn("Exception while using the pruned partitions.", e);
+      } finally {
+        if (totalPruningTime != null) {
+          logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+        }
+      }
+    }
+
+    private List<String> getValues(String selectionRoot, List<String> directories, List<Integer> indexes) {
+      List<String> values = new ArrayList<>();
+      for (String dir : directories) {
 
 Review comment:
   Thanks for suggestion but I find it harder to read.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services