You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gatorsmile <gi...@git.apache.org> on 2016/07/23 01:14:57 UTC

[GitHub] spark pull request #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning P...

GitHub user gatorsmile opened a pull request:

    https://github.com/apache/spark/pull/14322

    [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partition Columns When No Partition Column Exist in Project

    ### What changes were proposed in this pull request?
    For partitioned file sources, the current implementation always scans all the partition columns. However, this is not necessary when the projected column list does not include any partition column. In addition, we also can avoid the unnecessary Project.
    Below is an example, 
    
    Below is an example,
    ```scala
    spark
      .range(N)
      .selectExpr("id AS value1", "id AS value2", "id AS p1", "id AS p2", "id AS p3")
      .toDF("value", "value2", "p1", "p2", "p3").write.format("json")
      .partitionBy("p1", "p2", "p3").save(tempDir)
    ```
    ```
    spark.read.format("json").load(tempDir).selectExpr("value")
    ```
    
    **Before the PR changes**, the physical plan is like:
    ```
    == Physical Plan ==
    *Project [value#37L]
    +- *Scan json [value#37L,p1#39,p2#40,p3#41] Format: JSON, InputPaths: file:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-f7a4294a-2e1b-4f44-9ebb-1a5eb..., PushedFilters: [], ReadSchema: struct<value:bigint>
    ```
    
    **After the PR changes**, the physical plan becomes:
    ```
    == Physical Plan ==
    *Scan json [value#147L] Format: JSON, InputPaths: file:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-a5bcb14a-46c2-4c20-8f34-9662b..., PushedFilters: [], ReadSchema: struct<value:bigint>
    ```
    ### How was this patch tested?
    Added a test case to verify the results.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gatorsmile/spark columnPruning

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14322
    
----
commit c1ff0465815f6adefb2b29c2973c9bc63aa13623
Author: gatorsmile <ga...@gmail.com>
Date:   2016-07-23T00:59:14Z

    solution1

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    Yeah, I did not see a noticeable performance difference based on the local tests I did. Based on the outputs of whole-stage code gen, the number of instructions is less. Thus, I think it helps a little bit but it does not affect the overall performance. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62743/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    **Before the PR changes**, the whole-stage codegen output is like:
    ```JAVA
    == Subtree 1 / 1 ==
    *Project [value#37L]
    +- *Scan json [value#37L,p1#39,p2#40,p3#41] Format: JSON, InputPaths: file:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-f7a4294a-2e1b-4f44-9ebb-1a5eb..., PushedFilters: [], ReadSchema: struct<value:bigint>
    
    Generated code:
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
    /* 008 */   private scala.collection.Iterator scan_input;
    /* 009 */   private UnsafeRow project_result;
    /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
    /* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
    /* 012 */
    /* 013 */   public GeneratedIterator(Object[] references) {
    /* 014 */     this.references = references;
    /* 015 */   }
    /* 016 */
    /* 017 */   public void init(int index, scala.collection.Iterator inputs[]) {
    /* 018 */     partitionIndex = index;
    /* 019 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
    /* 020 */     scan_input = inputs[0];
    /* 021 */     project_result = new UnsafeRow(1);
    /* 022 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 0);
    /* 023 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
    /* 024 */   }
    /* 025 */
    /* 026 */   protected void processNext() throws java.io.IOException {
    /* 027 */     while (scan_input.hasNext()) {
    /* 028 */       InternalRow scan_row = (InternalRow) scan_input.next();
    /* 029 */       scan_numOutputRows.add(1);
    /* 030 */       boolean scan_isNull4 = scan_row.isNullAt(0);
    /* 031 */       long scan_value4 = scan_isNull4 ? -1L : (scan_row.getLong(0));
    /* 032 */       project_rowWriter.zeroOutNullBytes();
    /* 033 */
    /* 034 */       if (scan_isNull4) {
    /* 035 */         project_rowWriter.setNullAt(0);
    /* 036 */       } else {
    /* 037 */         project_rowWriter.write(0, scan_value4);
    /* 038 */       }
    /* 039 */       append(project_result);
    /* 040 */       if (shouldStop()) return;
    /* 041 */     }
    /* 042 */   }
    /* 043 */ }
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    **[Test build #62743 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62743/consoleFull)** for PR 14322 at commit [`c1ff046`](https://github.com/apache/spark/commit/c1ff0465815f6adefb2b29c2973c9bc63aa13623).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning P...

Posted by jaceklaskowski <gi...@git.apache.org>.
Github user jaceklaskowski commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14322#discussion_r71992661
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ---
    @@ -135,9 +135,17 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
             PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
             INPUT_PATHS -> fsRelation.location.paths.mkString(", "))
     
    +      // If the required attributes does not have the partitioning columns, we do not need
    +      // to scan the partitioning columns. If partitioning columns are selected, the column order
    +      // of partitionColumns is fixed in rdd. Thus, we always scan all the partitioning columns.
    +      val scannedColumns = if (requiredAttributes.intersect(partitionSet).nonEmpty) {
    --- End diff --
    
    What do you think about `map` and `getOrElse`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    cc @marmbrus @cloud-fan @liancheng After history checking, most of codes are done by you. Thanks! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    **[Test build #62743 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62743/consoleFull)** for PR 14322 at commit [`c1ff046`](https://github.com/apache/spark/commit/c1ff0465815f6adefb2b29c2973c9bc63aa13623).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning P...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14322#discussion_r71965622
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ---
    @@ -135,9 +135,17 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
             PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
             INPUT_PATHS -> fsRelation.location.paths.mkString(", "))
     
    +      // If the required attributes does not have the partitioning columns, we do not need
    +      // to scan the partitioning columns. If partitioning columns are selected, the column order
    +      // of partitionColumns is fixed in rdd. Thus, we always scan all the partitioning columns.
    +      val scannedColumns = if (requiredAttributes.intersect(partitionSet).nonEmpty) {
    +        readDataColumns ++ partitionColumns
    +      } else {
    +        readDataColumns
    +      }
           val scan =
             DataSourceScanExec.create(
    -          readDataColumns ++ partitionColumns,
    +          scannedColumns,
    --- End diff --
    
    This is solution one that requires few code changes but gets most benefits. We can have another solution that covers more cases by changing the RDD generation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    how much benefit can we get by avoiding scan partition columns? Seems that we just parse the directory string to get the partition values, no IO is needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning P...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile closed the pull request at:

    https://github.com/apache/spark/pull/14322


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14322: [SPARK-16689] [SQL] FileSourceStrategy: Pruning Partitio...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/14322
  
    **After the PR changes**, the whole-stage codegen output is like:
    ```JAVA
    == Subtree 1 / 1 ==
    *Scan json [value#37L] Format: JSON, InputPaths: file:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-8ac18be7-053f-4498-bf59-5ed87..., PushedFilters: [], ReadSchema: struct<value:bigint>
    
    Generated code:
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
    /* 008 */   private scala.collection.Iterator scan_input;
    /* 009 */
    /* 010 */   public GeneratedIterator(Object[] references) {
    /* 011 */     this.references = references;
    /* 012 */   }
    /* 013 */
    /* 014 */   public void init(int index, scala.collection.Iterator inputs[]) {
    /* 015 */     partitionIndex = index;
    /* 016 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
    /* 017 */     scan_input = inputs[0];
    /* 018 */   }
    /* 019 */
    /* 020 */   protected void processNext() throws java.io.IOException {
    /* 021 */     while (scan_input.hasNext()) {
    /* 022 */       InternalRow scan_row = (InternalRow) scan_input.next();
    /* 023 */       scan_numOutputRows.add(1);
    /* 024 */       append(scan_row);
    /* 025 */       if (shouldStop()) return;
    /* 026 */     }
    /* 027 */   }
    /* 028 */ }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org