You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rdblue <gi...@git.apache.org> on 2018/06/06 20:26:24 UTC

[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

GitHub user rdblue opened a pull request:

    https://github.com/apache/spark/pull/21503

    [SPARK-24478][SQL] Move projection and filter push down to physical conversion

    ## What changes were proposed in this pull request?
    
    This removes the v2 optimizer rule for push-down and instead pushes filters and required columns when converting to a physical plan, as suggested by @marmbrus. This makes the v2 relation cleaner because the output and filters do not change in the logical plan.
    
    To solve the problem of getting accurate statistics in the optimizer (push-down happens later, now), this adds a new trait, SupportsPhysicalStats that calculates LeafNode stats using the filters and projection. This trait can also be implemented by v1 data sources to get more accurate stats for CBO.
    
    The first commit was proposed in #21262. This PR replaces #21262.
    
    ## How was this patch tested?
    
    Existing tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rdblue/spark SPARK-24478-move-push-down-to-physical-conversion

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21503.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21503
    
----
commit c8517e145b1a460a8be07164c17ce20b1db86659
Author: Ryan Blue <bl...@...>
Date:   2018-05-07T20:08:02Z

    DataSourceV2: push projection, filters when converting to physical plan.

commit 9d3a11e68bca6c5a56a2be47fb09395350362ac5
Author: Ryan Blue <bl...@...>
Date:   2018-06-06T20:17:16Z

    SPARK-24478: Add trait to report stats with filters and projection.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194961443
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    @rdblue do you have time to prepare a PR for the 2rd proposal? I can do that too if you are busy with other stuff.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194839473
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    it's nice to decouple the problem and do pushdown during planning, but I feel the cost is too high in this approach. For file-based data sources, we need to query hive metastore to apply partitioning pruning during filter pushdown, and this can be very expensive. Doing it twice looks scaring to me.
    
    cc @gatorsmile @dongjoon-hyun @mallman , please correct me if I have a wrong understanding.
    
    also cc @wzhfy do you have an estimation about how long it takes to move statistics to physical plan?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/98/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    @cloud-fan, tests are passing for c8517e145b1a460a8be07164c17ce20b1db86659, which has all of the functional changes. The Jenkins job ran out of memory for the last commit, but the only change in it is in comments to add the note you requested. Should be good to go.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3994/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Let's also update the classdoc of `SupportsReportStatistics` to mention that, currently Spark will call it before any operator pushdown, it effectively means the returned statistics is for a full scan.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3825/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3989/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/103/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194953803
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    I'm not strongly opposed to any of the options, but 2 would be my choice if I had to pick one. A temporary state where functionality is missing is easier to reason about than temporary states where we deliberately impose a fuzzy lifecycle.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r195170789
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -32,79 +31,35 @@ import org.apache.spark.sql.types.StructType
     
     case class DataSourceV2Relation(
         source: DataSourceV2,
    +    output: Seq[AttributeReference],
         options: Map[String, String],
    -    projection: Seq[AttributeReference],
    -    filters: Option[Seq[Expression]] = None,
         userSpecifiedSchema: Option[StructType] = None)
       extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
     
       import DataSourceV2Relation._
     
    -  override def simpleString: String = "RelationV2 " + metadataString
    -
    -  override lazy val schema: StructType = reader.readSchema()
    -
    -  override lazy val output: Seq[AttributeReference] = {
    -    // use the projection attributes to avoid assigning new ids. fields that are not projected
    -    // will be assigned new ids, which is okay because they are not projected.
    -    val attrMap = projection.map(a => a.name -> a).toMap
    -    schema.map(f => attrMap.getOrElse(f.name,
    -      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
    -  }
    -
    -  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
    +  override def pushedFilters: Seq[Expression] = Seq.empty
     
    -  // postScanFilters: filters that need to be evaluated after the scan.
    -  // pushedFilters: filters that will be pushed down and evaluated in the underlying data sources.
    -  // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter.
    -  lazy val (
    -      reader: DataSourceReader,
    -      postScanFilters: Seq[Expression],
    -      pushedFilters: Seq[Expression]) = {
    -    val newReader = userSpecifiedSchema match {
    -      case Some(s) =>
    -        source.asReadSupportWithSchema.createReader(s, v2Options)
    -      case _ =>
    -        source.asReadSupport.createReader(v2Options)
    -    }
    -
    -    DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)
    -
    -    val (postScanFilters, pushedFilters) = filters match {
    -      case Some(filterSeq) =>
    -        DataSourceV2Relation.pushFilters(newReader, filterSeq)
    -      case _ =>
    -        (Nil, Nil)
    -    }
    -    logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
    -    logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
    -
    -    (newReader, postScanFilters, pushedFilters)
    -  }
    -
    -  override def doCanonicalize(): LogicalPlan = {
    -    val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
    +  override def simpleString: String = "RelationV2 " + metadataString
     
    -    // override output with canonicalized output to avoid attempting to configure a reader
    -    val canonicalOutput: Seq[AttributeReference] = this.output
    -        .map(a => QueryPlan.normalizeExprId(a, projection))
    +  lazy val v2Options: DataSourceOptions = makeV2Options(options)
     
    -    new DataSourceV2Relation(c.source, c.options, c.projection) {
    -      override lazy val output: Seq[AttributeReference] = canonicalOutput
    -    }
    +  def newReader: DataSourceReader = userSpecifiedSchema match {
    +    case Some(userSchema) =>
    +      source.asReadSupportWithSchema.createReader(userSchema, v2Options)
    +    case None =>
    +      source.asReadSupport.createReader(v2Options)
       }
     
    -  override def computeStats(): Statistics = reader match {
    +  override def computeStats(): Statistics = newReader match {
         case r: SupportsReportStatistics =>
           Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
         case _ =>
           Statistics(sizeInBytes = conf.defaultSizeInBytes)
       }
     
       override def newInstance(): DataSourceV2Relation = {
    --- End diff --
    
    I think we don't need to override `newInstance` now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    @cloud-fan, this is the PR for moving push-down to the physical plan conversion and reporting the stats correctly. Sorry for the confusion because I sent a link to just the second commit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91507/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    cc @rxin if you are interested.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    **[Test build #91507 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91507/testReport)** for PR 21503 at commit [`9d3a11e`](https://github.com/apache/spark/commit/9d3a11e68bca6c5a56a2be47fb09395350362ac5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194829647
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    It will configure two readers. One for the pushdown when converting to a physical plan and one for stats. The stats one should be temporary, though, since we want to address the problem. Configuring two readers instead of one allows us to decouple the problems so we can move forward with pushdown that works like the other data sources.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    **[Test build #91507 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91507/testReport)** for PR 21503 at commit [`9d3a11e`](https://github.com/apache/spark/commit/9d3a11e68bca6c5a56a2be47fb09395350362ac5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait SupportsPhysicalStats extends LeafNode `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194858392
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    > there's nothing forcing other data sources to implement the new trait ...
    
    hmmm, I'm a little confused here. All v2 data sources would have to apply pushdown twice right? Or are you suggesting we should not migrate file-based data source to data source v2?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194861645
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    I didn't realize you were talking about other v2 sources. Yes, two readers would be configured for v2. If you wanted to avoid it, you could cache when pushdown is expensive in the implementation or we could add something else that prevents that case.
    
    We need to do *something* to fix the current behavior of doing pushdown in the optimizer. I'm perfectly happy with less accurate stats for v2 until stats use the physical plan, or a solution like this where pushdown happens twice. I just don't think it is a good idea to continue with the design where the logical plan needs to use the v2 reader APIs. I think we agree that that should happen once, and conversion to physical plan is where it makes the most sense.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194871032
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    OK we need to make a decision here:
    1. Apply operator pushdown twice(proposed by tihs PR). This moves the pushdown logic to planner which is more ideal and cleaner. The drawback is, before moving statistics to physical plan, we have some duplicated pushdown code in `DataSourceV2Relation` and applying pushdown twice has performance penalty.
    2. Apply operator pushdown only once in the planner. Same as 1, it's cleaner. The drawback is, before moving statistics to physical plan, data source v2 can't report statistics after filter.
    3. Apply operator pushdown only once in the optimizer(proposed by https://github.com/apache/spark/pull/21319). It has no performance penalty and we can report statistics after filter. The drawback is, before moving statistics to physical plan, we have a temporary `DataSourceReader` in `DataSourceV2Relation`, which is hacky.
    
    The tradeoff is: shall we bear with hacky code and move forward with the data source v2 operator pushdown support? or shall we make the code cleaner and bear with some performance pemalty(apply pushdown twice or not report stats after filter)? or shall we just hold back and think about how to move stats to physical plan?
    
    cc @marmbrus @jose-torres 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r195138932
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    Yeah, no problem. I can just remove the stats commit from this one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Updated the stats interface.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Thank you for reviewing this, @cloud-fan!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    **[Test build #91789 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91789/testReport)** for PR 21503 at commit [`d5caf83`](https://github.com/apache/spark/commit/d5caf83e08792ba9d4163ac969a3675ce97fe698).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21503


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91789/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194841328
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    @cloud-fan, there's nothing forcing other data sources to implement the new trait. Other sources can continue to report stats for the entire table and not account for filters (the code assumes that row counts don't change). This just opens the option of reporting stats that are more accurate using the filters and projection that will be pushed.
    
    Ideally, I think that stats-based decisions would happen after pushdown so we get data that is as accurate as possible. But for now, this fixes the regression for v2 sources that happens because we move pushdown to a later step (conversion to physical plan like the other sources).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194895594
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    Yea the second proposal is what happens for the v1 data sources. For file-based data source we kind of pick the third proposal and add an optimizer rule `PruneFileSourcePartitions` to push down some of the filters to data source at the logical phase, to get precise stats.
    
    I'd like to pick from the 2nd and 3rd proposals(the 3rd proposal is also temporary, before we move stats to physical plan). Applying pushdown twice is hard to workaround(need to cache), while we can keep the `PruneFileSourcePartitions` rule to work around the issue in 2nd proposal for file-based data sources.
    
    Let's also get more inputs from other people.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    **[Test build #91782 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91782/testReport)** for PR 21503 at commit [`c8517e1`](https://github.com/apache/spark/commit/c8517e145b1a460a8be07164c17ce20b1db86659).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91782/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194875888
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    I don't mind either option #1 or #2. #2 is basically what happens for non-v2 data sources right now. Plus, both should be temporary.
    
    I think it is a bad idea to continue with hacky code that uses the reader in the logical plan. It is much cleaner otherwise and we've spend too much time making sure that everything still works. The main example that comes to mind is setting the requested projection and finding out what output is using pushdown. I think hacks are slowing progress on the v2 sources.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194828220
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns and column pruning alone
    +        // can produce the required projection, push the required projection.
    +        // A final projection may still be needed if the data source produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required projection or when the required
    +        // projection is more complicated than column pruning, base column pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    to confirm, do we have to do operator pushdown twice now? One in the plan visitor to calculate statistics, one here to build the physical plan, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    **[Test build #91782 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91782/testReport)** for PR 21503 at commit [`c8517e1`](https://github.com/apache/spark/commit/c8517e145b1a460a8be07164c17ce20b1db86659).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21503
  
    **[Test build #91789 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91789/testReport)** for PR 21503 at commit [`d5caf83`](https://github.com/apache/spark/commit/d5caf83e08792ba9d4163ac969a3675ce97fe698).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r195173700
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -32,79 +31,35 @@ import org.apache.spark.sql.types.StructType
     
     case class DataSourceV2Relation(
         source: DataSourceV2,
    +    output: Seq[AttributeReference],
         options: Map[String, String],
    -    projection: Seq[AttributeReference],
    -    filters: Option[Seq[Expression]] = None,
         userSpecifiedSchema: Option[StructType] = None)
       extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
     
       import DataSourceV2Relation._
     
    -  override def simpleString: String = "RelationV2 " + metadataString
    -
    -  override lazy val schema: StructType = reader.readSchema()
    -
    -  override lazy val output: Seq[AttributeReference] = {
    -    // use the projection attributes to avoid assigning new ids. fields that are not projected
    -    // will be assigned new ids, which is okay because they are not projected.
    -    val attrMap = projection.map(a => a.name -> a).toMap
    -    schema.map(f => attrMap.getOrElse(f.name,
    -      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
    -  }
    -
    -  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
    +  override def pushedFilters: Seq[Expression] = Seq.empty
     
    -  // postScanFilters: filters that need to be evaluated after the scan.
    -  // pushedFilters: filters that will be pushed down and evaluated in the underlying data sources.
    -  // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet row group filter.
    -  lazy val (
    -      reader: DataSourceReader,
    -      postScanFilters: Seq[Expression],
    -      pushedFilters: Seq[Expression]) = {
    -    val newReader = userSpecifiedSchema match {
    -      case Some(s) =>
    -        source.asReadSupportWithSchema.createReader(s, v2Options)
    -      case _ =>
    -        source.asReadSupport.createReader(v2Options)
    -    }
    -
    -    DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)
    -
    -    val (postScanFilters, pushedFilters) = filters match {
    -      case Some(filterSeq) =>
    -        DataSourceV2Relation.pushFilters(newReader, filterSeq)
    -      case _ =>
    -        (Nil, Nil)
    -    }
    -    logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
    -    logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
    -
    -    (newReader, postScanFilters, pushedFilters)
    -  }
    -
    -  override def doCanonicalize(): LogicalPlan = {
    -    val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
    +  override def simpleString: String = "RelationV2 " + metadataString
     
    -    // override output with canonicalized output to avoid attempting to configure a reader
    -    val canonicalOutput: Seq[AttributeReference] = this.output
    -        .map(a => QueryPlan.normalizeExprId(a, projection))
    +  lazy val v2Options: DataSourceOptions = makeV2Options(options)
     
    -    new DataSourceV2Relation(c.source, c.options, c.projection) {
    -      override lazy val output: Seq[AttributeReference] = canonicalOutput
    -    }
    +  def newReader: DataSourceReader = userSpecifiedSchema match {
    +    case Some(userSchema) =>
    +      source.asReadSupportWithSchema.createReader(userSchema, v2Options)
    +    case None =>
    +      source.asReadSupport.createReader(v2Options)
       }
     
    -  override def computeStats(): Statistics = reader match {
    +  override def computeStats(): Statistics = newReader match {
         case r: SupportsReportStatistics =>
           Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
         case _ =>
           Statistics(sizeInBytes = conf.defaultSizeInBytes)
       }
     
       override def newInstance(): DataSourceV2Relation = {
    --- End diff --
    
    I thought that initially, but the canonicalization test was failing without this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org