You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by amansinha100 <gi...@git.apache.org> on 2015/09/13 19:02:27 UTC

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

GitHub user amansinha100 opened a pull request:

    https://github.com/apache/drill/pull/156

    DRILL-3735: For partition pruning divide up the partition lists into …

    …sublists of 64K each and iterate over each sublist.
    
    Add abstract base class for various partition descriptors.  Add logging messages in PruneScanRule for better debuggability.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/amansinha100/incubator-drill partition9

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/156.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #156
    
----
commit dc079ad2cfa2813817564cc8bdd66356d0c6e59c
Author: Aman Sinha <as...@maprtech.com>
Date:   2015-09-12T19:57:12Z

    DRILL-3735: For partition pruning divide up the partition lists into sublists of 64K each and iterate over each sublist.
    
    Add abstract base class for various partition descriptors.  Add logging messages in PruneScanRule for better debuggability.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39478410
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java ---
    @@ -125,4 +117,16 @@ private String getBaseTableLocation() {
         final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
         return origSelection.getSelection().selectionRoot;
       }
    +
    +  @Override
    +  protected void createPartitionSublists() {
    +    Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
    +    List<PartitionLocation> locations = new LinkedList<>();
    +    for (String file: fileLocations) {
    +      locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
    --- End diff --
    
    Is fileLocation the absolute path to the file, or the relative path to the selectionRoot? If it's the former case, the file name could be quite long, considering that the directory could locate deep in the file system tree.  For instance, DRILL-1488 tried to consider the case where the file identifier could be as long as 1024, the default identifier max length.  So, in such extreme cases, we may hit heap memory issue, if we keep 100k long absolute path in the heap. Certainly, I agree it's not the normal cases, though it could happen.
    
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on the pull request:

    https://github.com/apache/drill/pull/156#issuecomment-142106704
  
    +1.   LGTM.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39464031
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---
    @@ -176,81 +177,103 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro
         RexNode pruneCondition = c.getFinalCondition();
     
         if (pruneCondition == null) {
    +      logger.debug("No conditions were found eligible for partition pruning.");
           return;
         }
     
     
         // set up the partitions
    -    final GroupScan groupScan = scanRel.getGroupScan();
    -    List<PartitionLocation> partitions = descriptor.getPartitions();
    -
    -    if (partitions.size() > Character.MAX_VALUE) {
    -      return;
    -    }
    -
    -    final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    -    final VectorContainer container = new VectorContainer();
    -
    -    try {
    -      final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    -      for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    -        SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    -        MajorType type = descriptor.getVectorType(column, settings);
    -        MaterializedField field = MaterializedField.create(column, type);
    -        ValueVector v = TypeHelper.getNewVector(field, allocator);
    -        v.allocateNew();
    -        vectors[partitionColumnIndex] = v;
    -        container.add(v);
    -      }
    -
    -      // populate partition vectors.
    -      descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    -
    -      // materialize the expression
    -      logger.debug("Attempting to prune {}", pruneCondition);
    -      final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    -      final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    -
    -      LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    -      // Make sure pruneCondition's materialized expression is always of BitType, so that
    -      // it's same as the type of output vector.
    -      if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    -        materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    -            materializedExpr,
    -            materializedExpr.getMajorType().getMinorType(),
    -            optimizerContext.getFunctionRegistry(),
    -            errors);
    +    List<String> newFiles = Lists.newArrayList();
    +    long numTotal = 0; // total number of partitions
    +    int batchIndex = 0;
    +    String firstLocation = null;
    +
    +    // Outer loop: iterate over a list of batches of PartitionLocations
    +    for (List<PartitionLocation> partitions : descriptor) {
    +      numTotal += partitions.size();
    +      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
    +      if (batchIndex == 0) { // save the first location in case everything is pruned
    +        firstLocation = partitions.get(0).getEntirePartitionLocation();
           }
    +      final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    +      final VectorContainer container = new VectorContainer();
    +
    +      try {
    +        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    +          for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    +          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    +          MajorType type = descriptor.getVectorType(column, settings);
    +          MaterializedField field = MaterializedField.create(column, type);
    +          ValueVector v = TypeHelper.getNewVector(field, allocator);
    +          v.allocateNew();
    +          vectors[partitionColumnIndex] = v;
    +          container.add(v);
    +        }
     
    -      if (errors.getErrorCount() != 0) {
    -        logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
    -      }
    +        // populate partition vectors.
    +        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    +
    +        // materialize the expression
    +        logger.debug("Attempting to prune {}", pruneCondition);
    +        final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    +        final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    +
    +        LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    +        // Make sure pruneCondition's materialized expression is always of BitType, so that
    +        // it's same as the type of output vector.
    +        if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    +          materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    +              materializedExpr,
    +              materializedExpr.getMajorType().getMinorType(),
    +              optimizerContext.getFunctionRegistry(),
    +              errors);
    +        }
     
    -      output.allocateNew(partitions.size());
    -      InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
    -      int record = 0;
    +        if (errors.getErrorCount() != 0) {
    +          logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
    +        }
     
    -      List<String> newFiles = Lists.newArrayList();
    -      for(PartitionLocation part: partitions){
    -        if(!output.getAccessor().isNull(record) && output.getAccessor().get(record) == 1){
    -          newFiles.add(part.getEntirePartitionLocation());
    +        output.allocateNew(partitions.size());
    +        InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
    +        int recordCount = 0;
    +        int qualifiedCount = 0;
    +
    +        // Inner loop: within each batch iterate over the PartitionLocations
    +        for(PartitionLocation part: partitions){
    +          if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
    +            newFiles.add(part.getEntirePartitionLocation());
    +            qualifiedCount++;
    +          }
    +          recordCount++;
    +        }
    +        logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
    +        batchIndex++;
    +      } catch (Exception e) {
    +        logger.warn("Exception while trying to prune partition.", e);
    --- End diff --
    
    If there is Exception during partition pruning for one sublist, seems we just log the error as a warning. That means, the code could continue the logic after the "for-loop". I feel that might produce incorrect result, since the list of new partitions might be invalid. 
    
    Should we stop the execution of partition pruning, once an Exception is caught here?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39474612
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---
    @@ -176,81 +177,103 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro
         RexNode pruneCondition = c.getFinalCondition();
     
         if (pruneCondition == null) {
    +      logger.debug("No conditions were found eligible for partition pruning.");
           return;
         }
     
     
         // set up the partitions
    -    final GroupScan groupScan = scanRel.getGroupScan();
    -    List<PartitionLocation> partitions = descriptor.getPartitions();
    -
    -    if (partitions.size() > Character.MAX_VALUE) {
    -      return;
    -    }
    -
    -    final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    -    final VectorContainer container = new VectorContainer();
    -
    -    try {
    -      final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    -      for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    -        SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    -        MajorType type = descriptor.getVectorType(column, settings);
    -        MaterializedField field = MaterializedField.create(column, type);
    -        ValueVector v = TypeHelper.getNewVector(field, allocator);
    -        v.allocateNew();
    -        vectors[partitionColumnIndex] = v;
    -        container.add(v);
    -      }
    -
    -      // populate partition vectors.
    -      descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    -
    -      // materialize the expression
    -      logger.debug("Attempting to prune {}", pruneCondition);
    -      final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    -      final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    -
    -      LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    -      // Make sure pruneCondition's materialized expression is always of BitType, so that
    -      // it's same as the type of output vector.
    -      if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    -        materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    -            materializedExpr,
    -            materializedExpr.getMajorType().getMinorType(),
    -            optimizerContext.getFunctionRegistry(),
    -            errors);
    +    List<String> newFiles = Lists.newArrayList();
    +    long numTotal = 0; // total number of partitions
    +    int batchIndex = 0;
    +    String firstLocation = null;
    +
    +    // Outer loop: iterate over a list of batches of PartitionLocations
    +    for (List<PartitionLocation> partitions : descriptor) {
    +      numTotal += partitions.size();
    +      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
    +      if (batchIndex == 0) { // save the first location in case everything is pruned
    +        firstLocation = partitions.get(0).getEntirePartitionLocation();
           }
    +      final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    +      final VectorContainer container = new VectorContainer();
    +
    +      try {
    +        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    +          for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    +          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    +          MajorType type = descriptor.getVectorType(column, settings);
    +          MaterializedField field = MaterializedField.create(column, type);
    +          ValueVector v = TypeHelper.getNewVector(field, allocator);
    +          v.allocateNew();
    +          vectors[partitionColumnIndex] = v;
    +          container.add(v);
    +        }
     
    -      if (errors.getErrorCount() != 0) {
    -        logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
    -      }
    +        // populate partition vectors.
    +        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    +
    +        // materialize the expression
    +        logger.debug("Attempting to prune {}", pruneCondition);
    +        final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    +        final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    +
    +        LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    +        // Make sure pruneCondition's materialized expression is always of BitType, so that
    +        // it's same as the type of output vector.
    +        if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    +          materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    +              materializedExpr,
    +              materializedExpr.getMajorType().getMinorType(),
    +              optimizerContext.getFunctionRegistry(),
    +              errors);
    +        }
     
    -      output.allocateNew(partitions.size());
    -      InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
    -      int record = 0;
    +        if (errors.getErrorCount() != 0) {
    --- End diff --
    
    Agree...I will move the materialization outside the loop.  Instead of throwing Exception (which PP rule is explicitly trying to avoid), we could log the warning about materialization error and return. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39474948
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java ---
    @@ -125,4 +117,16 @@ private String getBaseTableLocation() {
         final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
         return origSelection.getSelection().selectionRoot;
       }
    +
    +  @Override
    +  protected void createPartitionSublists() {
    +    Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
    +    List<PartitionLocation> locations = new LinkedList<>();
    +    for (String file: fileLocations) {
    +      locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
    --- End diff --
    
    Actually, this patch was not about reducing memory footprint per se.  It was to eliminate the 64K files limit for partition pruning.  The above function logic is the same as we had before for getPartitions() plus the new splitting of the list into sublists.  The long filenames seem less of an issue for the JVM heap usage. Suppose we have 100K files each with name length 200 bytes.  This is 20MB which is relatively low compared to the heap size.   However, we should try to build a better framework for propagating the filenames throughout the planning process.  Right now, methods such as FormatSelection.getAsFiles() populate all the filenames as once.   Ideally, these could also expose an iterator model. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39528224
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java ---
    @@ -125,4 +117,16 @@ private String getBaseTableLocation() {
         final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
         return origSelection.getSelection().selectionRoot;
       }
    +
    +  @Override
    +  protected void createPartitionSublists() {
    +    Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
    +    List<PartitionLocation> locations = new LinkedList<>();
    +    for (String file: fileLocations) {
    +      locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
    --- End diff --
    
    Long file names are an issue not just for partition pruning but metadata in general..that's what I was saying previously about FormatSelection.getAsFiles() and ParquetGroupScan.getFileSet() etc.  If we want to put the names into direct memory rather than heap, then a broader change is needed.   We should have a separate JIRA for that I think. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39527135
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---
    @@ -176,81 +177,103 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro
         RexNode pruneCondition = c.getFinalCondition();
     
         if (pruneCondition == null) {
    +      logger.debug("No conditions were found eligible for partition pruning.");
           return;
         }
     
     
         // set up the partitions
    -    final GroupScan groupScan = scanRel.getGroupScan();
    -    List<PartitionLocation> partitions = descriptor.getPartitions();
    -
    -    if (partitions.size() > Character.MAX_VALUE) {
    -      return;
    -    }
    -
    -    final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    -    final VectorContainer container = new VectorContainer();
    -
    -    try {
    -      final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    -      for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    -        SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    -        MajorType type = descriptor.getVectorType(column, settings);
    -        MaterializedField field = MaterializedField.create(column, type);
    -        ValueVector v = TypeHelper.getNewVector(field, allocator);
    -        v.allocateNew();
    -        vectors[partitionColumnIndex] = v;
    -        container.add(v);
    -      }
    -
    -      // populate partition vectors.
    -      descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    -
    -      // materialize the expression
    -      logger.debug("Attempting to prune {}", pruneCondition);
    -      final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    -      final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    -
    -      LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    -      // Make sure pruneCondition's materialized expression is always of BitType, so that
    -      // it's same as the type of output vector.
    -      if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    -        materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    -            materializedExpr,
    -            materializedExpr.getMajorType().getMinorType(),
    -            optimizerContext.getFunctionRegistry(),
    -            errors);
    +    List<String> newFiles = Lists.newArrayList();
    +    long numTotal = 0; // total number of partitions
    +    int batchIndex = 0;
    +    String firstLocation = null;
    +
    +    // Outer loop: iterate over a list of batches of PartitionLocations
    +    for (List<PartitionLocation> partitions : descriptor) {
    +      numTotal += partitions.size();
    +      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
    +      if (batchIndex == 0) { // save the first location in case everything is pruned
    +        firstLocation = partitions.get(0).getEntirePartitionLocation();
           }
    +      final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    +      final VectorContainer container = new VectorContainer();
    +
    +      try {
    +        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    +          for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    +          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    +          MajorType type = descriptor.getVectorType(column, settings);
    +          MaterializedField field = MaterializedField.create(column, type);
    +          ValueVector v = TypeHelper.getNewVector(field, allocator);
    +          v.allocateNew();
    +          vectors[partitionColumnIndex] = v;
    +          container.add(v);
    +        }
     
    -      if (errors.getErrorCount() != 0) {
    -        logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
    -      }
    +        // populate partition vectors.
    +        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    +
    +        // materialize the expression
    +        logger.debug("Attempting to prune {}", pruneCondition);
    +        final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    +        final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    +
    +        LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    +        // Make sure pruneCondition's materialized expression is always of BitType, so that
    +        // it's same as the type of output vector.
    +        if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    +          materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    +              materializedExpr,
    +              materializedExpr.getMajorType().getMinorType(),
    +              optimizerContext.getFunctionRegistry(),
    +              errors);
    +        }
     
    -      output.allocateNew(partitions.size());
    -      InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
    -      int record = 0;
    +        if (errors.getErrorCount() != 0) {
    --- End diff --
    
    Actually, the ExpressionTreeMaterializer.visitSchemaPath() needs access to the VectorContainer, so doing this materialization outside the loop is not an option since the VectorContainer is initialized inside the loop.  But we can do this only once by putting it inside the if (batchIndex == 0)  condition.  In fact, the container itself should be created only once and cleared in each iteration.  I will make that change. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39463605
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---
    @@ -176,81 +177,103 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro
         RexNode pruneCondition = c.getFinalCondition();
     
         if (pruneCondition == null) {
    +      logger.debug("No conditions were found eligible for partition pruning.");
           return;
         }
     
     
         // set up the partitions
    -    final GroupScan groupScan = scanRel.getGroupScan();
    -    List<PartitionLocation> partitions = descriptor.getPartitions();
    -
    -    if (partitions.size() > Character.MAX_VALUE) {
    -      return;
    -    }
    -
    -    final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    -    final VectorContainer container = new VectorContainer();
    -
    -    try {
    -      final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    -      for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    -        SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    -        MajorType type = descriptor.getVectorType(column, settings);
    -        MaterializedField field = MaterializedField.create(column, type);
    -        ValueVector v = TypeHelper.getNewVector(field, allocator);
    -        v.allocateNew();
    -        vectors[partitionColumnIndex] = v;
    -        container.add(v);
    -      }
    -
    -      // populate partition vectors.
    -      descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    -
    -      // materialize the expression
    -      logger.debug("Attempting to prune {}", pruneCondition);
    -      final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    -      final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    -
    -      LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    -      // Make sure pruneCondition's materialized expression is always of BitType, so that
    -      // it's same as the type of output vector.
    -      if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    -        materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    -            materializedExpr,
    -            materializedExpr.getMajorType().getMinorType(),
    -            optimizerContext.getFunctionRegistry(),
    -            errors);
    +    List<String> newFiles = Lists.newArrayList();
    +    long numTotal = 0; // total number of partitions
    +    int batchIndex = 0;
    +    String firstLocation = null;
    +
    +    // Outer loop: iterate over a list of batches of PartitionLocations
    +    for (List<PartitionLocation> partitions : descriptor) {
    +      numTotal += partitions.size();
    +      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
    +      if (batchIndex == 0) { // save the first location in case everything is pruned
    +        firstLocation = partitions.get(0).getEntirePartitionLocation();
           }
    +      final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    +      final VectorContainer container = new VectorContainer();
    +
    +      try {
    +        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    +          for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    +          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    +          MajorType type = descriptor.getVectorType(column, settings);
    +          MaterializedField field = MaterializedField.create(column, type);
    +          ValueVector v = TypeHelper.getNewVector(field, allocator);
    +          v.allocateNew();
    +          vectors[partitionColumnIndex] = v;
    +          container.add(v);
    +        }
     
    -      if (errors.getErrorCount() != 0) {
    -        logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
    -      }
    +        // populate partition vectors.
    +        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    +
    +        // materialize the expression
    +        logger.debug("Attempting to prune {}", pruneCondition);
    +        final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    +        final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    +
    +        LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    +        // Make sure pruneCondition's materialized expression is always of BitType, so that
    +        // it's same as the type of output vector.
    +        if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    +          materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    +              materializedExpr,
    +              materializedExpr.getMajorType().getMinorType(),
    +              optimizerContext.getFunctionRegistry(),
    +              errors);
    +        }
     
    -      output.allocateNew(partitions.size());
    -      InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
    -      int record = 0;
    +        if (errors.getErrorCount() != 0) {
    --- End diff --
    
    If expression materializer reports error, is it better to stop the execution of partition pruning rule, and raise Exception here? In such case, I feel it's likely that the Interperter would hit error as well.
    
    Also, is the condition expression same across multiple sub-list of partition locations? If that's the case, is it better to move the logic of expression materialization out of this for loop? We do not have to do materialization every time when we process one sublist.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39527473
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---
    @@ -176,81 +177,103 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro
         RexNode pruneCondition = c.getFinalCondition();
     
         if (pruneCondition == null) {
    +      logger.debug("No conditions were found eligible for partition pruning.");
           return;
         }
     
     
         // set up the partitions
    -    final GroupScan groupScan = scanRel.getGroupScan();
    -    List<PartitionLocation> partitions = descriptor.getPartitions();
    -
    -    if (partitions.size() > Character.MAX_VALUE) {
    -      return;
    -    }
    -
    -    final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    -    final VectorContainer container = new VectorContainer();
    -
    -    try {
    -      final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    -      for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    -        SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    -        MajorType type = descriptor.getVectorType(column, settings);
    -        MaterializedField field = MaterializedField.create(column, type);
    -        ValueVector v = TypeHelper.getNewVector(field, allocator);
    -        v.allocateNew();
    -        vectors[partitionColumnIndex] = v;
    -        container.add(v);
    -      }
    -
    -      // populate partition vectors.
    -      descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    -
    -      // materialize the expression
    -      logger.debug("Attempting to prune {}", pruneCondition);
    -      final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    -      final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    -
    -      LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    -      // Make sure pruneCondition's materialized expression is always of BitType, so that
    -      // it's same as the type of output vector.
    -      if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    -        materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    -            materializedExpr,
    -            materializedExpr.getMajorType().getMinorType(),
    -            optimizerContext.getFunctionRegistry(),
    -            errors);
    +    List<String> newFiles = Lists.newArrayList();
    +    long numTotal = 0; // total number of partitions
    +    int batchIndex = 0;
    +    String firstLocation = null;
    +
    +    // Outer loop: iterate over a list of batches of PartitionLocations
    +    for (List<PartitionLocation> partitions : descriptor) {
    +      numTotal += partitions.size();
    +      logger.debug("Evaluating partition pruning for batch {}", batchIndex);
    +      if (batchIndex == 0) { // save the first location in case everything is pruned
    +        firstLocation = partitions.get(0).getEntirePartitionLocation();
           }
    +      final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator);
    +      final VectorContainer container = new VectorContainer();
    +
    +      try {
    +        final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()];
    +          for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
    +          SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    +          MajorType type = descriptor.getVectorType(column, settings);
    +          MaterializedField field = MaterializedField.create(column, type);
    +          ValueVector v = TypeHelper.getNewVector(field, allocator);
    +          v.allocateNew();
    +          vectors[partitionColumnIndex] = v;
    +          container.add(v);
    +        }
     
    -      if (errors.getErrorCount() != 0) {
    -        logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
    -      }
    +        // populate partition vectors.
    +        descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
    +
    +        // materialize the expression
    +        logger.debug("Attempting to prune {}", pruneCondition);
    +        final LogicalExpression expr = DrillOptiq.toDrill(new DrillParseContext(settings), scanRel, pruneCondition);
    +        final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    +
    +        LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, container, errors, optimizerContext.getFunctionRegistry());
    +        // Make sure pruneCondition's materialized expression is always of BitType, so that
    +        // it's same as the type of output vector.
    +        if (materializedExpr.getMajorType().getMode() == TypeProtos.DataMode.REQUIRED) {
    +          materializedExpr = ExpressionTreeMaterializer.convertToNullableType(
    +              materializedExpr,
    +              materializedExpr.getMajorType().getMinorType(),
    +              optimizerContext.getFunctionRegistry(),
    +              errors);
    +        }
     
    -      output.allocateNew(partitions.size());
    -      InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
    -      int record = 0;
    +        if (errors.getErrorCount() != 0) {
    +          logger.warn("Failure while materializing expression [{}].  Errors: {}", expr, errors);
    +        }
     
    -      List<String> newFiles = Lists.newArrayList();
    -      for(PartitionLocation part: partitions){
    -        if(!output.getAccessor().isNull(record) && output.getAccessor().get(record) == 1){
    -          newFiles.add(part.getEntirePartitionLocation());
    +        output.allocateNew(partitions.size());
    +        InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
    +        int recordCount = 0;
    +        int qualifiedCount = 0;
    +
    +        // Inner loop: within each batch iterate over the PartitionLocations
    +        for(PartitionLocation part: partitions){
    +          if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
    +            newFiles.add(part.getEntirePartitionLocation());
    +            qualifiedCount++;
    +          }
    +          recordCount++;
    +        }
    +        logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
    +        batchIndex++;
    +      } catch (Exception e) {
    +        logger.warn("Exception while trying to prune partition.", e);
    --- End diff --
    
    Yes, we should do an early return once an exception is caught..I will add that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39465039
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java ---
    @@ -125,4 +117,16 @@ private String getBaseTableLocation() {
         final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
         return origSelection.getSelection().selectionRoot;
       }
    +
    +  @Override
    +  protected void createPartitionSublists() {
    +    Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
    +    List<PartitionLocation> locations = new LinkedList<>();
    +    for (String file: fileLocations) {
    +      locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
    --- End diff --
    
    Looks like we are still putting the file name including the directory name into heap memory, before break it into multiple sublists.  In other words, this patch will reduce the direct memory footprint allocated for value vectors. But it does not address the heap memory issue caused by very long file names, right?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39463695
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java ---
    @@ -168,6 +168,7 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro
         }
    --- End diff --
    
    Remove this pair of "{", "}" ? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/156


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] drill pull request: DRILL-3735: For partition pruning divide up th...

Posted by jinfengni <gi...@git.apache.org>.
Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39528358
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java ---
    @@ -125,4 +117,16 @@ private String getBaseTableLocation() {
         final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
         return origSelection.getSelection().selectionRoot;
       }
    +
    +  @Override
    +  protected void createPartitionSublists() {
    +    Set<String> fileLocations = ((ParquetGroupScan) scanRel.getGroupScan()).getFileSet();
    +    List<PartitionLocation> locations = new LinkedList<>();
    +    for (String file: fileLocations) {
    +      locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
    --- End diff --
    
    Yes, agreed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---