You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ianluyan <gi...@git.apache.org> on 2015/02/05 12:23:31 UTC

[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

GitHub user ianluyan opened a pull request:

    https://github.com/apache/spark/pull/4394

    [SPARK-5614][SQL] Predicate pushdown through Generate.

    Now in Catalyst's rules, predicates can not be pushed through "Generate" nodes. Further more, partition pruning in HiveTableScan can not be applied on those queries involves "Generate". This makes such queries very inefficient. In practice, it finds patterns like 
    
    ```scala
    Filter(predicate, Generate(generator, _, _, _, grandChild))
    ```
    
    and splits the predicate into 2 parts by referencing the generated column from Generate node or not. And a new Filter will be created for those conjuncts can be pushed beneath Generate node. If nothing left for the original Filter, it will be removed.
    For example, physical plan for query
    ```sql
    select len, bk
    from s_server lateral view explode(len_arr) len_table as len 
    where len > 5 and day = '20150102';
    ```
    where 'day' is a partition column in metastore is like this in current version of Spark SQL:
    
    > Project [len, bk]
    > 
    > Filter ((len > "5") && "(day = "20150102")")
    > 
    > Generate explode(len_arr), true, false
    > 
    > HiveTableScan [bk, len_arr, day], (MetastoreRelation default, s_server, None), None
    
    But theoretically the plan should be like this
    
    > Project [len, bk]
    > 
    > Filter (len > "5")
    > 
    > Generate explode(len_arr), true, false
    > 
    > HiveTableScan [bk, len_arr, day], (MetastoreRelation default, s_server, None), Some(day = "20150102")
    
    Where partition pruning predicates can be pushed to HiveTableScan nodes.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ianluyan/spark ppd

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/4394.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4394
    
----
commit d312f20fb00faeda2f0a207fad75555896102e05
Author: Lu Yan <lu...@baidu.com>
Date:   2015-02-05T11:13:39Z

    [SPARK-5614][SQL] Predicate pushdown through Generate.
    
    With this patch, queries involves 'explode' can benefit from predicate pushdown, thus partition pruning.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24238455
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    +    case And(left, right) => split(left) ++ split(right)
    +    case e: Expression => List(e)
    +  }
    +
    +  /**
    +   * Join a list of conjuncts into a single predicate
    +   * @param conjuncts conjuncts
    +   * @return Some(predicate) or None if conjunct list is empty
    +   */
    +  private def joinConjuncts(conjuncts: Seq[Expression]): Option[Expression] = {
    +    conjuncts.foldRight(None.asInstanceOf[Option[Expression]]) {
    +      (conjunct, joint) => (conjunct, joint) match {
    +        case (c: Expression, Some(j)) => Some(And(c, j))
    +        case (c: Expression, None) => Some(c)
    +      }
    +    }
    +  }
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case filter @ Filter(condition,
    +    generate @ Generate(generator, join, outer, alias, grandChild)) =>
    +      val conjuncts = split(condition)
    +      val referencedAttrsInConjuncts = conjuncts.zip(conjuncts.map(_.references))
    +      val generatedAttrNames = generator match {
    --- End diff --
    
    Use the `alias` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73437714
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24303688
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,30 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    --- End diff --
    
    Nit: Part -> Parts, which -> that, referenced -> reference


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24238680
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    +    case And(left, right) => split(left) ++ split(right)
    +    case e: Expression => List(e)
    +  }
    +
    +  /**
    +   * Join a list of conjuncts into a single predicate
    +   * @param conjuncts conjuncts
    +   * @return Some(predicate) or None if conjunct list is empty
    +   */
    +  private def joinConjuncts(conjuncts: Seq[Expression]): Option[Expression] = {
    +    conjuncts.foldRight(None.asInstanceOf[Option[Expression]]) {
    +      (conjunct, joint) => (conjunct, joint) match {
    +        case (c: Expression, Some(j)) => Some(And(c, j))
    +        case (c: Expression, None) => Some(c)
    +      }
    +    }
    +  }
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case filter @ Filter(condition,
    +    generate @ Generate(generator, join, outer, alias, grandChild)) =>
    +      val conjuncts = split(condition)
    +      val referencedAttrsInConjuncts = conjuncts.zip(conjuncts.map(_.references))
    +      val generatedAttrNames = generator match {
    +        case Explode(attrs, expr) => attrs
    +        case _ => Seq[String]()
    +      }
    +      // Those conjuncts referenced a attributes generated in `generate` can not
    +      // be pushed below `generate`
    +      val (predicatesUp, predicatesDown) = referencedAttrsInConjuncts.partition {
    +        conjunctsWithReferences =>
    +          conjunctsWithReferences._2.exists(attr => generatedAttrNames.contains(attr.name))
    +      }
    +      val upCondition = joinConjuncts(predicatesUp.map(_._1))
    +      val downCondition = joinConjuncts(predicatesDown.map(_._1))
    +      (upCondition, downCondition) match {
    +        case (Some(upPredicate), Some(downPredicate)) =>
    +          // UpPredicate <- Generate <- DownPredicate <- Grand Child
    +          Filter(upPredicate,
    +            generate.copy(generator, join, outer, alias,
    +              Filter(downPredicate, grandChild)))
    +
    +        case (Some(upPredicate), None) =>
    +          filter
    +
    +        case (None, Some(downPredicate)) =>
    +          // Generate <- DownPredicate <- Grand Child
    +          generate.copy(generator, join, outer, alias,
    +            Filter(downPredicate, grandChild))
    +
    +        case (None, None) =>
    --- End diff --
    
    Throw exception instead ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24238571
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    +    case And(left, right) => split(left) ++ split(right)
    +    case e: Expression => List(e)
    +  }
    +
    +  /**
    +   * Join a list of conjuncts into a single predicate
    +   * @param conjuncts conjuncts
    +   * @return Some(predicate) or None if conjunct list is empty
    +   */
    +  private def joinConjuncts(conjuncts: Seq[Expression]): Option[Expression] = {
    +    conjuncts.foldRight(None.asInstanceOf[Option[Expression]]) {
    +      (conjunct, joint) => (conjunct, joint) match {
    +        case (c: Expression, Some(j)) => Some(And(c, j))
    +        case (c: Expression, None) => Some(c)
    +      }
    +    }
    +  }
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case filter @ Filter(condition,
    +    generate @ Generate(generator, join, outer, alias, grandChild)) =>
    +      val conjuncts = split(condition)
    +      val referencedAttrsInConjuncts = conjuncts.zip(conjuncts.map(_.references))
    +      val generatedAttrNames = generator match {
    +        case Explode(attrs, expr) => attrs
    +        case _ => Seq[String]()
    +      }
    +      // Those conjuncts referenced a attributes generated in `generate` can not
    +      // be pushed below `generate`
    +      val (predicatesUp, predicatesDown) = referencedAttrsInConjuncts.partition {
    +        conjunctsWithReferences =>
    --- End diff --
    
    `case (expr, references) =>` probably more helpful in understanding the variable means


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73031880
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24255422
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    --- End diff --
    
    I think this is duplicated from `PredicateHelper`, i'd mix that in instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73471566
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27090/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24270534
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    +    case And(left, right) => split(left) ++ split(right)
    +    case e: Expression => List(e)
    +  }
    +
    +  /**
    +   * Join a list of conjuncts into a single predicate
    +   * @param conjuncts conjuncts
    +   * @return Some(predicate) or None if conjunct list is empty
    +   */
    +  private def joinConjuncts(conjuncts: Seq[Expression]): Option[Expression] = {
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73463062
  
      [Test build #27086 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27086/consoleFull) for   PR 4394 at commit [`c7f0003`](https://github.com/apache/spark/commit/c7f000375889910092cd32487b32264a3be56da7).
     * This patch **fails to build**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24272029
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    +    case And(left, right) => split(left) ++ split(right)
    +    case e: Expression => List(e)
    +  }
    +
    +  /**
    +   * Join a list of conjuncts into a single predicate
    +   * @param conjuncts conjuncts
    +   * @return Some(predicate) or None if conjunct list is empty
    +   */
    +  private def joinConjuncts(conjuncts: Seq[Expression]): Option[Expression] = {
    +    conjuncts.foldRight(None.asInstanceOf[Option[Expression]]) {
    +      (conjunct, joint) => (conjunct, joint) match {
    +        case (c: Expression, Some(j)) => Some(And(c, j))
    +        case (c: Expression, None) => Some(c)
    +      }
    +    }
    +  }
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case filter @ Filter(condition,
    +    generate @ Generate(generator, join, outer, alias, grandChild)) =>
    +      val conjuncts = split(condition)
    +      val referencedAttrsInConjuncts = conjuncts.zip(conjuncts.map(_.references))
    +      val generatedAttrNames = generator match {
    +        case Explode(attrs, expr) => attrs
    +        case _ => Seq[String]()
    +      }
    +      // Those conjuncts referenced a attributes generated in `generate` can not
    +      // be pushed below `generate`
    +      val (predicatesUp, predicatesDown) = referencedAttrsInConjuncts.partition {
    +        conjunctsWithReferences =>
    +          conjunctsWithReferences._2.exists(attr => generatedAttrNames.contains(attr.name))
    +      }
    +      val upCondition = joinConjuncts(predicatesUp.map(_._1))
    +      val downCondition = joinConjuncts(predicatesDown.map(_._1))
    +      (upCondition, downCondition) match {
    +        case (Some(upPredicate), Some(downPredicate)) =>
    +          // UpPredicate <- Generate <- DownPredicate <- Grand Child
    +          Filter(upPredicate,
    +            generate.copy(generator, join, outer, alias,
    +              Filter(downPredicate, grandChild)))
    +
    +        case (Some(upPredicate), None) =>
    +          filter
    +
    +        case (None, Some(downPredicate)) =>
    +          // Generate <- DownPredicate <- Grand Child
    +          generate.copy(generator, join, outer, alias,
    +            Filter(downPredicate, grandChild))
    +
    +        case (None, None) =>
    --- End diff --
    
    Hmm, yeah.  It seems to me this is indicative of the fact that the flow above isn't correct.  What about this implementation:
    
    ```scala
    /**
     * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
     * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
     */
    object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelper {
      def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        case Filter(condition,
               g @ Generate(generator, join, outer, alias, grandChild)) =>
          val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { predicate =>
            predicate.references subsetOf grandChild.outputSet
          }
    
          if (pushDown.nonEmpty) {
            val withPushDown = g.copy(child = Filter(pushDown.reduce(And), grandChild))
            stayUp.reduceOption(And).map(Filter(_, withPushDown)).getOrElse(withPushDown)
          } else {
            plan
          }
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by ianluyan <gi...@git.apache.org>.
Github user ianluyan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24297449
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    +    case And(left, right) => split(left) ++ split(right)
    +    case e: Expression => List(e)
    +  }
    +
    +  /**
    +   * Join a list of conjuncts into a single predicate
    +   * @param conjuncts conjuncts
    +   * @return Some(predicate) or None if conjunct list is empty
    +   */
    +  private def joinConjuncts(conjuncts: Seq[Expression]): Option[Expression] = {
    +    conjuncts.foldRight(None.asInstanceOf[Option[Expression]]) {
    +      (conjunct, joint) => (conjunct, joint) match {
    +        case (c: Expression, Some(j)) => Some(And(c, j))
    +        case (c: Expression, None) => Some(c)
    +      }
    +    }
    +  }
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case filter @ Filter(condition,
    +    generate @ Generate(generator, join, outer, alias, grandChild)) =>
    +      val conjuncts = split(condition)
    +      val referencedAttrsInConjuncts = conjuncts.zip(conjuncts.map(_.references))
    +      val generatedAttrNames = generator match {
    +        case Explode(attrs, expr) => attrs
    +        case _ => Seq[String]()
    +      }
    +      // Those conjuncts referenced a attributes generated in `generate` can not
    +      // be pushed below `generate`
    +      val (predicatesUp, predicatesDown) = referencedAttrsInConjuncts.partition {
    +        conjunctsWithReferences =>
    +          conjunctsWithReferences._2.exists(attr => generatedAttrNames.contains(attr.name))
    +      }
    +      val upCondition = joinConjuncts(predicatesUp.map(_._1))
    +      val downCondition = joinConjuncts(predicatesDown.map(_._1))
    +      (upCondition, downCondition) match {
    +        case (Some(upPredicate), Some(downPredicate)) =>
    +          // UpPredicate <- Generate <- DownPredicate <- Grand Child
    +          Filter(upPredicate,
    +            generate.copy(generator, join, outer, alias,
    +              Filter(downPredicate, grandChild)))
    +
    +        case (Some(upPredicate), None) =>
    +          filter
    +
    +        case (None, Some(downPredicate)) =>
    +          // Generate <- DownPredicate <- Grand Child
    +          generate.copy(generator, join, outer, alias,
    +            Filter(downPredicate, grandChild))
    +
    +        case (None, None) =>
    --- End diff --
    
    Yeah, I think you're right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73442116
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27055/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73461687
  
      [Test build #27083 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27083/consoleFull) for   PR 4394 at commit [`7b25c70`](https://github.com/apache/spark/commit/7b25c70c399e4ea9bf5a9f7d18571fc6db98bf61).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73452232
  
      [Test build #27074 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27074/consoleFull) for   PR 4394 at commit [`c7f0003`](https://github.com/apache/spark/commit/c7f000375889910092cd32487b32264a3be56da7).
     * This patch **fails to build**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24303675
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,30 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case filter @ Filter(condition,
    +    generate @ Generate(generator, join, outer, alias, grandChild)) =>
    +      // Those conjuncts referenced a attributes generated in `generate` can not
    +      // be pushed below `generate`
    --- End diff --
    
    Nit: "Predicates that reference attributes produced by the `Generate` operator cannot be pushed below the operator."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73471560
  
      [Test build #27090 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27090/consoleFull) for   PR 4394 at commit [`a67dce9`](https://github.com/apache/spark/commit/a67dce9553f708485bcd3691c86b8967499c8657).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73452235
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27074/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73442111
  
      [Test build #27055 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27055/consoleFull) for   PR 4394 at commit [`c03a95b`](https://github.com/apache/spark/commit/c03a95b82263350c6855f6670aedb09462edf5df).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73466577
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27083/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73620840
  
    Thanks!  Merged to master and 1.3.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73462661
  
      [Test build #27086 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27086/consoleFull) for   PR 4394 at commit [`c7f0003`](https://github.com/apache/spark/commit/c7f000375889910092cd32487b32264a3be56da7).
     * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24270692
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    +    case And(left, right) => split(left) ++ split(right)
    +    case e: Expression => List(e)
    +  }
    +
    +  /**
    +   * Join a list of conjuncts into a single predicate
    +   * @param conjuncts conjuncts
    +   * @return Some(predicate) or None if conjunct list is empty
    +   */
    +  private def joinConjuncts(conjuncts: Seq[Expression]): Option[Expression] = {
    +    conjuncts.foldRight(None.asInstanceOf[Option[Expression]]) {
    +      (conjunct, joint) => (conjunct, joint) match {
    +        case (c: Expression, Some(j)) => Some(And(c, j))
    +        case (c: Expression, None) => Some(c)
    +      }
    +    }
    +  }
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case filter @ Filter(condition,
    +    generate @ Generate(generator, join, outer, alias, grandChild)) =>
    +      val conjuncts = split(condition)
    +      val referencedAttrsInConjuncts = conjuncts.zip(conjuncts.map(_.references))
    +      val generatedAttrNames = generator match {
    +        case Explode(attrs, expr) => attrs
    +        case _ => Seq[String]()
    +      }
    +      // Those conjuncts referenced a attributes generated in `generate` can not
    +      // be pushed below `generate`
    +      val (predicatesUp, predicatesDown) = referencedAttrsInConjuncts.partition {
    +        conjunctsWithReferences =>
    +          conjunctsWithReferences._2.exists(attr => generatedAttrNames.contains(attr.name))
    --- End diff --
    
    Never compare expressions by name.  Instead use `AttributeSet` and `AttributeMap` when possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/4394


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4394#discussion_r24238399
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -454,6 +455,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
     }
     
     /**
    + * Push [[Filter]] operators through [[Generate]] operators. Part of the predicate which referenced
    + * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
    + */
    +object PushPredicateThroughGenerate extends Rule[LogicalPlan] {
    +  /**
    +   * Split a condition / predicate into conjuncts
    +   * @param condition predicate
    +   * @return conjuncts
    +   */
    +  private def split(condition: Expression): List[Expression] = condition match {
    +    case And(left, right) => split(left) ++ split(right)
    +    case e: Expression => List(e)
    +  }
    +
    +  /**
    +   * Join a list of conjuncts into a single predicate
    +   * @param conjuncts conjuncts
    +   * @return Some(predicate) or None if conjunct list is empty
    +   */
    +  private def joinConjuncts(conjuncts: Seq[Expression]): Option[Expression] = {
    --- End diff --
    
    `conjuncts.reduceRight(And)` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73466573
  
      [Test build #27083 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27083/consoleFull) for   PR 4394 at commit [`7b25c70`](https://github.com/apache/spark/commit/7b25c70c399e4ea9bf5a9f7d18571fc6db98bf61).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73437790
  
      [Test build #27055 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27055/consoleFull) for   PR 4394 at commit [`c03a95b`](https://github.com/apache/spark/commit/c03a95b82263350c6855f6670aedb09462edf5df).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73463064
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27086/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73465477
  
      [Test build #27090 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27090/consoleFull) for   PR 4394 at commit [`a67dce9`](https://github.com/apache/spark/commit/a67dce9553f708485bcd3691c86b8967499c8657).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5614][SQL] Predicate pushdown through G...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4394#issuecomment-73451822
  
      [Test build #27074 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27074/consoleFull) for   PR 4394 at commit [`c7f0003`](https://github.com/apache/spark/commit/c7f000375889910092cd32487b32264a3be56da7).
     * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org