You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by aokolnychyi <gi...@git.apache.org> on 2017/07/20 20:32:49 UTC

[GitHub] spark pull request #18692: [SPARK-21417][SQL] Detect joind conditions via fi...

GitHub user aokolnychyi opened a pull request:

    https://github.com/apache/spark/pull/18692

    [SPARK-21417][SQL] Detect joind conditions via filter expressions

    ## What changes were proposed in this pull request?
    
    This PR adds an optimization rule that infers join conditions based on filter expressions that are specified. 
    
    For example, 
    `SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND t2.col2 = 1` 
    can be transformed into 
    `SELECT * FROM t1 JOIN t2 ON t1.col1 = t2.col2 WHERE t1.col1 = 1 AND t2.col2 = 1`.
    
    Refer to the corresponding ticket and tests for more details.
    
    ## How was this patch tested?
    
    This patch comes with a new test suite to cover the implemented logic.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aokolnychyi/spark spark-21417

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18692.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18692
    
----
commit e67d4d3c0bdf5cac4c6b17b50314984a2a6378d2
Author: aokolnychyi <an...@sap.com>
Date:   2017-07-18T18:49:16Z

    [SPARK-21417][SQL] Detect joind conditions via filter expressions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82892/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Yeah. That is a wrong case. Let us revisit it if we can find any useful case here. Thank you!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r144722742
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    --- End diff --
    
    @gengliangwang Yeah, makes sense. So, ``PushPredicateThroughJoin`` would push the where clause into the join and the proposed rule will infer ``t1.col1 = t2.col1`` and change the join type to INNER. As a result, the final join condition will be ``t1.col1 = t2.col1 and t1.col1 >= t2.col1 and (t1.col1 = t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2)``. Am I right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @gatorsmile what is our decision here? Shall we wait until SPARK-21652 is resolved? In the meantime, I can add some tests and see how the proposed rule works together with all others. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #84351 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84351/testReport)** for PR 18692 at commit [`9ab91a1`](https://github.com/apache/spark/commit/9ab91a19cefd63b7d28674992b68da8164d487ae).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153060560
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    + */
    +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      eliminateCrossJoin(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join@Join(leftPlan, rightPlan, Cross, None) =>
    --- End diff --
    
    Nit: `join@Join` -> `join @ Join`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #84351 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84351/testReport)** for PR 18692 at commit [`9ab91a1`](https://github.com/apache/spark/commit/9ab91a19cefd63b7d28674992b68da8164d487ae).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r152660385
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +
    +      val newConditionOpt = conditionOpt match {
    +        case Some(condition) =>
    +          val existingPredicates = splitConjunctivePredicates(condition)
    +          val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates)
    +          if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None
    +        case None =>
    +          inferredJoinPredicates.reduceOption(And)
    +      }
    +      if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join
    --- End diff --
    
    @gatorsmile Thanks for getting back.
    
    ``CheckCartesianProducts`` identifies a join of type ``Inner | LeftOuter | RightOuter | FullOuter`` as a cartesian product if there is no join predicate that has references to both relations.
    
    If we agree to ignore joins of type Cross that have a condition (in this PR), then the use case in this [discussion](https://github.com/apache/spark/pull/18692#discussion_r144466472) is no longer possible (even if you remove t1.col1 >= t2.col1). Correct? ``PushPredicateThroughJoin`` will push ``t1.col1 = t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2`` into the join condition and the proposed rule will not infer anything and the 
    final join will be of type Cross with a condition that covers both relations. According to the logic of ``CheckCartesianProducts``, it is not considered to be a cartesian product (since there exists a join predicate that covers both relations, e.g. ``t1.col1 = t1.col2 + t2.col2``).
    
    So, if I have a confirmation that we need to consider only joins of type Cross and without any join conditions, I can update the PR accordingly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153060551
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    --- End diff --
    
    > For instance, given a CROSS join with the constraint 'a = 1' from the left child and the constraint 'b = 1' from the right child, this rule infers a new join predicate 'a = b' and convert it to an Inner join.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #80056 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80056/testReport)** for PR 18692 at commit [`915dc7e`](https://github.com/apache/spark/commit/915dc7ecb1891ce7387e49b8eab915049bd34f93).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r144466472
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    --- End diff --
    
    I don't think we need to separate the constraints as left only and right only.
    The following case can infer `t1.col1 = t2.col1`:
    ```scala
    Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
    Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t2")
    val df = spark.sql("SELECT * FROM t1 CROSS JOIN t2 ON t1.col1 >= t2.col1 " +
       "WHERE t1.col1 = t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2")
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81390/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    In this PR, we should limit it to `cartesian product` now. In the future, we need to perform smarter when extracting equi-join keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r152725251
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +
    +      val newConditionOpt = conditionOpt match {
    +        case Some(condition) =>
    +          val existingPredicates = splitConjunctivePredicates(condition)
    +          val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates)
    +          if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None
    +        case None =>
    +          inferredJoinPredicates.reduceOption(And)
    +      }
    +      if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join
    --- End diff --
    
    Yes. In this PR, we just need to consider cross join without any join condition. 
    
    In the future, we can extend it. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r137343433
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    --- End diff --
    
    I also thought about this but `InferFiltersFromConstraints` does not change considered join types. Therefore, I kept them separated. In addition, I thought about renaming it to `EliminateCrossJoin`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SimonBin <gi...@git.apache.org>.
Github user SimonBin commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @aokolnychyi thank you for the clarification, I see now


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    I think we already did it via constraint propagation, didn't we?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r152412423
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    --- End diff --
    
    Yes. Since we decide to focus on cross join only, we should rename it to `EliminateCrossJoin `, like what you proposed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @gatorsmile I took a look at the case above. Indeed, the proposed rule triggers this issue but only indirectly. In the example above, the optimizer will never reach a fixed point. Please, find my investigation below.
    
    ```
    ... 
    
    // The new rule infers correct join predicates
    Join Inner, ((col2#33 = col#34) && (col1#32 = col#34))
    :- Filter ((col1#32 = col2#33) && (col1#32 = 1))
    :  +- Relation[col1#32,col2#33] parquet
    +- Filter (col#34 = 1)
       +- Relation[col#34] parquet
    
    // InferFiltersFromConstraints adds more filters
    Join Inner, ((col2#33 = col#34) && (col1#32 = col#34))
    :- Filter ((((col2#33 = 1) && isnotnull(col1#32)) && isnotnull(col2#33)) && ((col1#32 = col2#33) && (col1#32 = 1)))
    :  +- Relation[col1#32,col2#33] parquet
    +- Filter (isnotnull(col#34) && (col#34 = 1))
       +- Relation[col#34] parquet
    
    // ConstantPropagation is applied
    Join Inner, ((col2#33 = col#34) && (col1#32 = col#34))
    !:- Filter (((((col2#33 = 1) && isnotnull(col2#33)) && isnotnull(col1#32)) && ((1 = col2#33) && (col1#32 = 1))) 
     :  +- Relation[col1#32,col2#33] parquet
     +- Filter (isnotnull(col#34) && (col#34 = 1))
        +- Relation[col#34] parquet                          
    
    // (Important) InferFiltersFromConstraints infers (col1#32 = col2#33), which is added to the join condition.
    Join Inner, ((col1#32 = col2#33) && ((col2#33 = col#34) && (col1#32 = col#34)))
    !:- Filter (((((col2#33 = 1) && isnotnull(col2#33)) && isnotnull(col1#32)) && ((1 = col2#33) && (col1#32 = 1))) 
     :  +- Relation[col1#32,col2#33] parquet
     +- Filter (isnotnull(col#34) && (col#34 = 1))
        +- Relation[col#34] parquet
    
     // PushPredicateThroughJoin pushes down (col1#32 = col2#33) and then CombineFilters produces
    Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
    !:- Filter ((((isnotnull(col1#32) && (col2#33 = 1)) && isnotnull(col2#33)) && ((1 = col2#33) && (col1#32 = 1))) && (col2#33 = col1#32))
     :  +- Relation[col1#32,col2#33] parquet
     +- Filter (isnotnull(col#34) && (col#34 = 1))
        +- Relation[col#34] parquet                                                                      
    
    ```
    After that, `ConstantPropagation` replaces `(col2#33 = col1#32)` as `(1 = 1)`, `BooleanSimplification` removes `(1 = 1)`, `InferFiltersFromConstraints` infers `(col2#33 = col1#32)` again and the procedure repeats forever. Since `InferFiltersFromConstraints` is the last optimization rule, we have the redundant condition mentioned by you. The Optimizer without the new rule will also not converge on the following query:
    
    ```
    Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
    Seq(1, 2).toDF("col").write.saveAsTable("t2")
    spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 = t2.col AND t1.col2 = t2.col").explain(true)
    ```
    Correct me if I am wrong, but it seems like an issue with the existing rules.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Yeah, correct. So, we should revert then.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    I am not sure we can infer ``a == b`` if ``a in (0, 2, 3, 4)`` and ``b in (0, 2, 3, 4)``. 
    
    table 'a'
    ```
    a1 a2
    1  2
    3  3
    4  5
    ```
    
    table 'b'
    ```
    b1 b2
    1  -1
    2  -2
    3  -4
    ```
    
    ```
    SELECT * FROM a, b WHERE a1 in (1, 2) AND b1 in (1, 2)
    // 1 2 1 -1
    // 1 2 2 -2
    ```
    ```
    SELECT * FROM a JOIN b ON a.a1 = b.b1 WHERE a1 in (1, 2) AND b1 in (1, 2)
    // 1 2 1 -1
    ```
    
    Do I miss anything?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153066992
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    + */
    +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      eliminateCrossJoin(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join@Join(leftPlan, rightPlan, Cross, None) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +      val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
    +      if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join
    +  }
    +
    +  private def inferJoinPredicates(
    +      leftConstraints: Set[Expression],
    +      rightConstraints: Set[Expression]): Set[EqualTo] = {
    +
    +    // iterate through the left constraints and build a hash map that points semantically
    +    // equivalent expressions into attributes
    +    val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
    +    val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case _ => map
    +      }
    +    }
    +
    +    // iterate through the right constraints and infer join conditions using the equivalence map
    +    rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case _ => joinConditions
    +      }
    +    }
    +  }
    +
    +  private def updateEquivalenceMap(
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      attr: Attribute,
    +      expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
    +
    +    val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute])
    +    if (equivalentAttrs.contains(attr)) {
    +      equivalenceMap
    +    } else {
    +      equivalenceMap.updated(expr, equivalentAttrs + attr)
    +    }
    +  }
    +
    +  private def appendJoinConditions(
    +      attr: Attribute,
    +      expr: Expression,
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      joinConditions: Set[EqualTo]): Set[EqualTo] = {
    +
    +    equivalenceMap.get(expr) match {
    +      case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _))
    +      case None => joinConditions
    +    }
    +  }
    +
    +  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions
    +  implicit class SemanticExpression(private val expr: Expression) {
    --- End diff --
    
    @gatorsmile 
    
    I think we just need the case class inside ``EquivalentExpressions`` since we have to map all semantically equivalent expressions into a set of attributes (as opposed to mapping an expression into a set of equivalent expressions). 
    
    I see two ways to go:
    
    1. Expose the case class inside ``EquivalentExpressions`` with minimum changes in the code base (e.g., using a companion object):
    
    ````
    object EquivalentExpressions {
    
      /**
       * Wrapper around an Expression that provides semantic equality.
       */
      implicit class SemanticExpr(private val e: Expression) {
        override def equals(o: Any): Boolean = o match {
          case other: SemanticExpr => e.semanticEquals(other.e)
          case _ => false
        }
    
        override def hashCode: Int = e.semanticHash()
      }
    }
    ````
    
    2. Keep ``EquivalentExpressions`` as it is and maintain a separate map from expressions to attributes in the proposed rule.
    
    Personally, I lean toward the first idea since it might be useful to have ``SemanticExpr`` alone. However, there can be other drawbacks that did not come into my mind.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Detect joind conditions via fi...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r130252998
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,72 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to inner joins.
    + *
    + * For instance, if there is a join, where the left relation has 'a = 1' and the right relation
    + * has 'b = 1', then the rule infers 'a = b' as a join predicate. Only semantically new predicates
    + * are appended to the existing join condition.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Inner, conditionOpt) =>
    --- End diff --
    
    `InnerLike`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153342778
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    + */
    +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      eliminateCrossJoin(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join@Join(leftPlan, rightPlan, Cross, None) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +      val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
    +      if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join
    +  }
    +
    +  private def inferJoinPredicates(
    +      leftConstraints: Set[Expression],
    +      rightConstraints: Set[Expression]): Set[EqualTo] = {
    +
    +    // iterate through the left constraints and build a hash map that points semantically
    +    // equivalent expressions into attributes
    +    val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
    +    val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case _ => map
    +      }
    +    }
    +
    +    // iterate through the right constraints and infer join conditions using the equivalence map
    +    rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case _ => joinConditions
    +      }
    +    }
    +  }
    +
    +  private def updateEquivalenceMap(
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      attr: Attribute,
    +      expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
    +
    +    val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute])
    +    if (equivalentAttrs.contains(attr)) {
    +      equivalenceMap
    +    } else {
    +      equivalenceMap.updated(expr, equivalentAttrs + attr)
    +    }
    +  }
    +
    +  private def appendJoinConditions(
    +      attr: Attribute,
    +      expr: Expression,
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      joinConditions: Set[EqualTo]): Set[EqualTo] = {
    +
    +    equivalenceMap.get(expr) match {
    +      case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _))
    +      case None => joinConditions
    +    }
    +  }
    +
    +  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions
    +  implicit class SemanticExpression(private val expr: Expression) {
    --- End diff --
    
    I did not check it carefully, but how about `ExpressionSet`? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #79804 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79804/testReport)** for PR 18692 at commit [`e67d4d3`](https://github.com/apache/spark/commit/e67d4d3c0bdf5cac4c6b17b50314984a2a6378d2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    LGTM 
    
    Thanks for your patience! It looks much good now. Really appreciate for your contributions! Welcome to make more contributions!
    
    Thanks! Merged to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @cloud-fan : In event when the (set of join keys) is a superset of (child node's partitioning keys), its possible to avoid shuffle : https://github.com/apache/spark/pull/19054 ... this can help with 2 cases - when users unknowingly join over extra columns in addition to bucket columns
    - the one you mentioned (ie. inferred conditions).



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    I took a look at ``JoinSelection``. It seems we will not get ``BroadcastHashJoin`` or ``ShuffledHashJoin`` if we revert this rule.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Sorry for the delay. @jiangxb1987 will submit a simple fix for the issue you mentioned. That will not be a perfect fix but it partially resolve the issue. In the future, we need to move the filter removal to a separate batch for cost-based optimization instead of doing it with filter inference in the same RBO batch. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Even if we use `BroadcastHashJoin` or `ShuffledHashJoin`, it does not help because the identical values on keys just cause the unnecessary work in both, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #84177 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84177/testReport)** for PR 18692 at commit [`3e090f9`](https://github.com/apache/spark/commit/3e090f9e4e6be8efb814e2e809a0718a0c670718).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84176/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #80056 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80056/testReport)** for PR 18692 at commit [`915dc7e`](https://github.com/apache/spark/commit/915dc7ecb1891ce7387e49b8eab915049bd34f93).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @aokolnychyi Thanks for finding the non-convergent case! Let me see how to fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82891/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84351/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r154164912
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala ---
    @@ -27,6 +27,8 @@ object ExpressionSet {
         expressions.foreach(set.add)
         set
       }
    +
    +  val empty: ExpressionSet = ExpressionSet(Nil)
    --- End diff --
    
    I thought that writing ``ExpressionSet.empty`` would be more readable than ``ExpressionSet(Nil)``. Usually, mutable collections have ``def empty()`` and immutable ones have separate objects that represent empty collections (e.g., ``Nil``, ``Stream.Empty``). I defined ``val empty`` since ``ExpressionSet`` is immutable.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/80362/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r152415250
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,79 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins. For other join types, adding inferred join conditions would potentially
    + * shuffle children as child node's partitioning won't satisfy the JOIN node's requirements
    + * which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +
    +      val rightEqualToPredicates = join.constraints.collect {
    +        case equalTo @ EqualTo(attr: Attribute, _) if isAttributeContainedInPlan(attr, right) =>
    +          equalTo
    +        case equalTo @ EqualTo(_, attr: Attribute) if isAttributeContainedInPlan(attr, right) =>
    +          equalTo
    +      }
    +
    +      val inferredJoinPredicates = join.constraints.flatMap {
    +        case EqualTo(attr: Attribute, equivalentExpr) if isAttributeContainedInPlan(attr, left) =>
    +          collectJoinPredicates(attr, equivalentExpr, right, rightEqualToPredicates)
    +        case EqualTo(equivalentExpr, attr: Attribute) if isAttributeContainedInPlan(attr, left) =>
    +          collectJoinPredicates(attr, equivalentExpr, right, rightEqualToPredicates)
    +        case _ => Nil
    +      }
    +
    +      val newConditionOpt = conditionOpt match {
    +        case Some(condition) =>
    +          val existingPredicates = splitConjunctivePredicates(condition)
    +          val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates)
    +          if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None
    +        case None =>
    +          inferredJoinPredicates.reduceOption(And)
    +      }
    +      if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join
    +  }
    +
    +  private def collectJoinPredicates(
    +      leftAttr: Attribute,
    +      equivalentExpr: Expression,
    +      rightPlan: LogicalPlan,
    +      rightPlanEqualToPredicates: Set[EqualTo]): Set[EqualTo] = {
    +
    +    rightPlanEqualToPredicates.collect {
    +      case EqualTo(attr: Attribute, expr)
    +        if expr.semanticEquals(equivalentExpr) && isAttributeContainedInPlan(attr, rightPlan) =>
    +        EqualTo(leftAttr, attr)
    +      case EqualTo(expr, attr: Attribute)
    +        if expr.semanticEquals(equivalentExpr) && isAttributeContainedInPlan(attr, rightPlan) =>
    +        EqualTo(leftAttr, attr)
    +    }
    +  }
    +
    +  private def isAttributeContainedInPlan(attr: Attribute, logicalPlan: LogicalPlan): Boolean = {
    +    attr.references.subsetOf(logicalPlan.outputSet)
    +  }
    +
    +  private def findNewPredicates(
    +      inferredPredicates: Set[EqualTo],
    +      existingPredicates: Seq[Expression]) : Set[EqualTo] = {
    --- End diff --
    
    `existingPredicates: Seq[Expression]) : Set[EqualTo] = {`
    -> `existingPredicates: Seq[Expression]): Set[EqualTo] = {`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @cloud-fan which rule do you mean? `PushPredicateThroughJoin` seems to be the closest by logic but it has a slightly different purpose and does not cover this use case. In fact, I used the proposed rule in conjunction with `PushPredicateThroughJoin` in the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    We need to use the propagated constraints to infer the join conditions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r144734911
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    --- End diff --
    
    Yes, you are right.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r154256590
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +153,62 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    --- End diff --
    
    can we apply this optimization to all joins after https://github.com/apache/spark/pull/19054?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #81390 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81390/testReport)** for PR 18692 at commit [`cfeae46`](https://github.com/apache/spark/commit/cfeae46766a6ccb1b1a0113fe41cdb52b16897f3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #84176 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84176/testReport)** for PR 18692 at commit [`ab7d966`](https://github.com/apache/spark/commit/ab7d966b9fa42c8852da4f3a196ab891b845c1b8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r130662925
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,72 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to inner joins.
    + *
    + * For instance, if there is a join, where the left relation has 'a = 1' and the right relation
    + * has 'b = 1', then the rule infers 'a = b' as a join predicate. Only semantically new predicates
    + * are appended to the existing join condition.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Inner, conditionOpt) =>
    --- End diff --
    
    I also thought about this but decided to start with a smaller scope. The motivation was that `"SELECT * FROM t1, t2"` is resolved into an Inner Join and one has to explicitly use the Cross Join syntax to allow cartesian products. I was not sure if it was OK to replace an explicit Cross Join with a join of a different type. Semantically, we can have `InnerLike` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r137111457
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    --- End diff --
    
    Instead of adding a new rule, we should improve the rule `InferFiltersFromConstraints`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r136868330
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    --- End diff --
    
    Can you also mention the reason why we are restricting this to cross joins only ?
    
    ```
    For other join types, adding inferred join conditions would potentially shuffle children as child node's partitioning won't satisfying the JOIN node's requirements which otherwise could have.
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #82892 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82892/testReport)** for PR 18692 at commit [`b69185c`](https://github.com/apache/spark/commit/b69185c2a7466b69a3f244a257449dbf1dd0ee21).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    BTW, your PR title and descriptions are out of dated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Will do it. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    cc @gengliangwang Review this? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153083496
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    + */
    +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      eliminateCrossJoin(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join@Join(leftPlan, rightPlan, Cross, None) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +      val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
    +      if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join
    +  }
    +
    +  private def inferJoinPredicates(
    +      leftConstraints: Set[Expression],
    +      rightConstraints: Set[Expression]): Set[EqualTo] = {
    +
    +    // iterate through the left constraints and build a hash map that points semantically
    +    // equivalent expressions into attributes
    +    val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
    +    val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case _ => map
    +      }
    +    }
    +
    +    // iterate through the right constraints and infer join conditions using the equivalence map
    +    rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case _ => joinConditions
    +      }
    +    }
    +  }
    +
    +  private def updateEquivalenceMap(
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      attr: Attribute,
    +      expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
    +
    +    val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute])
    +    if (equivalentAttrs.contains(attr)) {
    +      equivalenceMap
    +    } else {
    +      equivalenceMap.updated(expr, equivalentAttrs + attr)
    +    }
    +  }
    +
    +  private def appendJoinConditions(
    +      attr: Attribute,
    +      expr: Expression,
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      joinConditions: Set[EqualTo]): Set[EqualTo] = {
    +
    +    equivalenceMap.get(expr) match {
    +      case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _))
    +      case None => joinConditions
    +    }
    +  }
    +
    +  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions
    +  implicit class SemanticExpression(private val expr: Expression) {
    --- End diff --
    
    How about building a new class to process all the cases similar to this one?
    
    An `Attribute` is also an `Expression`. Basically, the internal will be still a hash map `mutable.HashMap.empty[SemanticEqualExpr, mutable.MutableList[Expression]]`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #82892 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82892/testReport)** for PR 18692 at commit [`b69185c`](https://github.com/apache/spark/commit/b69185c2a7466b69a3f244a257449dbf1dd0ee21).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @aokolnychyi After rethinking about it, we might need to revert this PR. Although it converts a CROSS Join to an Inner join, it does not improve the performance. What do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by Aklakan <gi...@git.apache.org>.
Github user Aklakan commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Hi @aokolnychyi, a on note on @SimonBin 's comment (I am his colleague):
    
    > The initial solution handled your case but then there was a decision to restrict the proposed rule to cross joins only.
    
    SimonBin's example appears to work when in addition enabling `spark.conf.set("spark.sql.crossJoin.enabled", "true")`
    
    We will investigate further how far this gets us.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Sure, if you guys think it does not give any performance benefits, then let's revert it.
    
    I also had similar concerns but my understanding was that having an inner join with some equality condition can be beneficial during the generation of a physical plan. In other words, Spark should be able to select a more efficient join implementation. I am not sure how it is right now but previously you could have only ``BroadcastNestedLoopJoin`` or ``CartesianProduct`` without any equality condition. Again, that was my assumption based on what I remember. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r137343500
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +
    +      val newConditionOpt = conditionOpt match {
    +        case Some(condition) =>
    +          val existingPredicates = splitConjunctivePredicates(condition)
    +          val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates)
    +          if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None
    +        case None =>
    +          inferredJoinPredicates.reduceOption(And)
    +      }
    +      if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join
    --- End diff --
    
    And what about CROSS joins with join conditions? Not sure if they will benefit from the proposed rule, but it is better to ask.
    
    ```
    Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
    Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t2")
    val df = spark.sql("SELECT * FROM t1 CROSS JOIN t2 ON t1.col1 >= t2.col1 WHERE t1.col1 = 1 AND t2.col1 = 1")
    df.explain(true)
    == Optimized Logical Plan ==
    Join Cross, (col1#40 >= col1#42)
    :- Filter (isnotnull(col1#40) && (col1#40 = 1))
    :  +- Relation[col1#40,col2#41] parquet
    +- Filter (isnotnull(col1#42) && (col1#42 = 1))
       +- Relation[col1#42,col2#43] parquet
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Can we restrict this to cartesian product ONLY ? One clear downside of doing this for other joins is that it will potentially add shuffle in case of (bucketing queries) and (subqueries in general). After adding the inferred join conditions, it might lead to the child node's partitioning NOT satisfying the JOIN node's requirements which otherwise could have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #82891 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82891/testReport)** for PR 18692 at commit [`e0e6ad3`](https://github.com/apache/spark/commit/e0e6ad381fc6f9b0b957e91e7c7df35207190021).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #80362 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80362/testReport)** for PR 18692 at commit [`281f955`](https://github.com/apache/spark/commit/281f9555226f62e99d1143295f5bd364fff55fc0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #80362 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80362/testReport)** for PR 18692 at commit [`281f955`](https://github.com/apache/spark/commit/281f9555226f62e99d1143295f5bd364fff55fc0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Yea I have the same feeling. If the left side has a `a = 1` constraint, and the right side has a `b = 1` constraint, adding a `a = b` join condition does not help as it always evaluate to true.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Hi, All.
    Since the commit is reverted from the master branch, can we update the status of JIRA issue?
    - https://issues.apache.org/jira/browse/SPARK-21417


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84177/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Also add a test case for non-deterministic cases.  For example, given the left child has `a = rand()` and the right child has `b = rand()`, we should not get `a = b`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @gatorsmile thanks for the input. Let me check that I understood everything correctly. So, I keep it as a separate rule that is applied only if constraint propagation enabled. Inside the rule, I rely on `join.constraints` to infer the join conditions. The remaining logic stays the same. Correct?
    
    I guess that `InferFiltersFromConstraints` can be used as a guideline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r152417440
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +
    +      val newConditionOpt = conditionOpt match {
    +        case Some(condition) =>
    +          val existingPredicates = splitConjunctivePredicates(condition)
    +          val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates)
    +          if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None
    +        case None =>
    +          inferredJoinPredicates.reduceOption(And)
    +      }
    +      if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join
    --- End diff --
    
    Please see the rule `CheckCartesianProducts `. The example above is not a CROSS join. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    You also need to resolve another case:
    ```Scala
            Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
            Seq(1, 2).toDF("col").write.saveAsTable("t2")
            sql("SELECT * FROM t1, t2 WHERE t1.col1 = t1.col2 AND t1.col1 = 1 AND t2.col = 1")
    ```
    This new rule can infer the unneeded join conditions, `col2 = col1 AND col2 = col AND col1 = col`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153329031
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    + */
    +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      eliminateCrossJoin(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join@Join(leftPlan, rightPlan, Cross, None) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +      val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
    +      if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join
    +  }
    +
    +  private def inferJoinPredicates(
    +      leftConstraints: Set[Expression],
    +      rightConstraints: Set[Expression]): Set[EqualTo] = {
    +
    +    // iterate through the left constraints and build a hash map that points semantically
    +    // equivalent expressions into attributes
    +    val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
    +    val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case _ => map
    +      }
    +    }
    +
    +    // iterate through the right constraints and infer join conditions using the equivalence map
    +    rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case _ => joinConditions
    +      }
    +    }
    +  }
    +
    +  private def updateEquivalenceMap(
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      attr: Attribute,
    +      expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
    +
    +    val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute])
    +    if (equivalentAttrs.contains(attr)) {
    +      equivalenceMap
    +    } else {
    +      equivalenceMap.updated(expr, equivalentAttrs + attr)
    +    }
    +  }
    +
    +  private def appendJoinConditions(
    +      attr: Attribute,
    +      expr: Expression,
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      joinConditions: Set[EqualTo]): Set[EqualTo] = {
    +
    +    equivalenceMap.get(expr) match {
    +      case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _))
    +      case None => joinConditions
    +    }
    +  }
    +
    +  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions
    +  implicit class SemanticExpression(private val expr: Expression) {
    --- End diff --
    
    Using a Set instead of a List might be beneficial in the proposed rule. What about the following?
    
    ```
    class EquivalentExpressionMap {
    
      private val equivalenceMap = mutable.HashMap.empty[SemanticallyEqualExpr, mutable.Set[Expression]]
    
      def put(expression: Expression, equivalentExpression: Expression): Unit = {
        val equivalentExpressions = equivalenceMap.getOrElse(expression, mutable.Set.empty)
        if (!equivalentExpressions.contains(equivalentExpression)) {
          equivalenceMap(expression) = equivalentExpressions += equivalentExpression
        }
      }
      
      // produce an immutable copy to avoid any modifications from outside
      def get(expression: Expression): Set[Expression] =
        equivalenceMap.get(expression).fold(Set.empty[Expression])(_.toSet)
    
    }
    
    object EquivalentExpressionMap {
    
      private implicit class SemanticallyEqualExpr(private val expr: Expression) {
        override def equals(o: Any): Boolean = o match {
          case other: SemanticallyEqualExpr => expr.semanticEquals(other.expr)
          case _ => false
        }
    
        override def hashCode: Int = expr.semanticHash()
      }
    }
    
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r137110438
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +
    +      val newConditionOpt = conditionOpt match {
    +        case Some(condition) =>
    +          val existingPredicates = splitConjunctivePredicates(condition)
    +          val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates)
    +          if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None
    +        case None =>
    +          inferredJoinPredicates.reduceOption(And)
    +      }
    +      if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join
    +  }
    +
    +  private def inferJoinPredicates(
    +      relationConstraints: Set[Expression],
    +      anotherRelationConstraints: Set[Expression]): Set[Expression] = {
    +
    +    relationConstraints.flatMap {
    +      case EqualTo(attr: Attribute, equivalentExpr) =>
    +        collectJoinPredicatesForAttribute(attr, equivalentExpr, anotherRelationConstraints)
    +      case EqualTo(equivalentExpr, attr: Attribute) =>
    +        collectJoinPredicatesForAttribute(attr, equivalentExpr, anotherRelationConstraints)
    +      case _ => Nil
    +    }
    +  }
    +
    +  private def collectJoinPredicatesForAttribute(
    +      attr: Attribute,
    +      equivalentExpr: Expression,
    +      anotherRelationConstraints: Set[Expression]): Set[Expression] = {
    +
    +    anotherRelationConstraints.collect {
    +      case EqualTo(anotherRelationAttr: Attribute, expr) if expr.semanticEquals(equivalentExpr) =>
    +        EqualTo(attr, anotherRelationAttr)
    +      case EqualTo(expr, rightAttribute: Attribute) if expr.semanticEquals(equivalentExpr) =>
    +        EqualTo(attr, rightAttribute)
    +    }
    +  }
    +
    +  private def findNewPredicates(
    --- End diff --
    
    This function can be removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SimonBin <gi...@git.apache.org>.
Github user SimonBin commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Hi, we are very interested in this patch. I wonder if it could detect this code automatically, without needing to write the explicit join:
    
    ```scala
    package net.sansa_stack.spark.playground
    
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
    import org.scalatest._
    
    class TestSparkSqlJoin extends FlatSpec {
    
      "SPARK SQL processor" should "be capable of handling transitive join conditions" in {
    
        val spark = SparkSession
          .builder()
          .master("local[1]")
          .getOrCreate()
    
        val schema = new StructType()
          .add("s", IntegerType, nullable = true)
          .add("p", IntegerType, nullable = true)
          .add("o", IntegerType, nullable = true)
    
        val data = List((1, 2, 3))
        val dataRDD = spark.sparkContext.parallelize(data).map(attributes => Row(attributes._1, attributes._2, attributes._3))
        spark.createDataFrame(dataRDD, schema).createOrReplaceTempView("T")
    
        spark.sql("SELECT A.s FROM T A, T B WHERE A.s = 1 AND B.s = 1").explain(true)
      }
    
    }
    ```
    
    
    I built this Pull request locally but it still gives me the same issue -->
    
    ```
    == Physical Plan ==
    org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
    Project [s#3]
    +- Filter (isnotnull(s#3) && (s#3 = 1))
       +- LogicalRDD [s#3, p#4, o#5], false
    and
    Project
    +- Filter (isnotnull(s#25) && (s#25 = 1))
       +- LogicalRDD [s#25, p#26, o#27], false
    Join condition is missing or trivial.
    Use the CROSS JOIN syntax to allow cartesian products between these relations.;
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    You are on the right track. You might find some bugs/issues when you implement it. Sorry, too busy recently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r145498671
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,79 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins. For other join types, adding inferred join conditions would potentially
    + * shuffle children as child node's partitioning won't satisfy the JOIN node's requirements
    + * which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +
    +      val rightEqualToPredicates = join.constraints.collect {
    --- End diff --
    
    I thought about improving the time complexity here via a hash map with semantic equals/hashcode. However, this idea will require a wrapper so I keep it as it is. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    > After adding the inferred join conditions, it might lead to the child node's partitioning NOT satisfying the JOIN node's requirements which otherwise could have.
    
    Isn't it an existing problem? the current constraint propagation framework infers as many predicates as possible, so we may already hit this problem. I think we should revisit the constraint propagation framework to think about how to avoid adding more shuffles, instead of stopping improving this framework to infer more predicates.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    > The point of this issue is not performance improvement, but that some (in our case automatically generated) queries do not work at all with SPARK, whereas there is no problem with these queries in PostgreSQL and MySQL.
    
    I'm surprised to hear this, did you turn on CROSS JOIN via `spark.sql.crossJoin.enabled`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r154272738
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +153,62 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    --- End diff --
    
    It sounds promising. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    you are right, then I don't know if there is any valid use case for inferring join condition from literals...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #80058 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80058/testReport)** for PR 18692 at commit [`851f388`](https://github.com/apache/spark/commit/851f3885c80a620415c142f09f5cc320d80b85e6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Done. Reverted.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/80056/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153621601
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    + */
    +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      eliminateCrossJoin(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join@Join(leftPlan, rightPlan, Cross, None) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +      val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
    +      if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join
    +  }
    +
    +  private def inferJoinPredicates(
    +      leftConstraints: Set[Expression],
    +      rightConstraints: Set[Expression]): Set[EqualTo] = {
    +
    +    // iterate through the left constraints and build a hash map that points semantically
    +    // equivalent expressions into attributes
    +    val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
    +    val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case _ => map
    +      }
    +    }
    +
    +    // iterate through the right constraints and infer join conditions using the equivalence map
    +    rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case _ => joinConditions
    +      }
    +    }
    +  }
    +
    +  private def updateEquivalenceMap(
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      attr: Attribute,
    +      expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
    +
    +    val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute])
    +    if (equivalentAttrs.contains(attr)) {
    +      equivalenceMap
    +    } else {
    +      equivalenceMap.updated(expr, equivalentAttrs + attr)
    +    }
    +  }
    +
    +  private def appendJoinConditions(
    +      attr: Attribute,
    +      expr: Expression,
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      joinConditions: Set[EqualTo]): Set[EqualTo] = {
    +
    +    equivalenceMap.get(expr) match {
    +      case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _))
    +      case None => joinConditions
    +    }
    +  }
    +
    +  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions
    +  implicit class SemanticExpression(private val expr: Expression) {
    --- End diff --
    
    I mean `def get(expression: Expression): ExpressionSet`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #81390 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81390/testReport)** for PR 18692 at commit [`cfeae46`](https://github.com/apache/spark/commit/cfeae46766a6ccb1b1a0113fe41cdb52b16897f3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/80058/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #80058 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80058/testReport)** for PR 18692 at commit [`851f388`](https://github.com/apache/spark/commit/851f3885c80a620415c142f09f5cc320d80b85e6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #84349 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84349/testReport)** for PR 18692 at commit [`0e5a9f2`](https://github.com/apache/spark/commit/0e5a9f20372b9687538b55dd2915af122afe7cb6).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class EquivalentExpressionMap `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #84176 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84176/testReport)** for PR 18692 at commit [`ab7d966`](https://github.com/apache/spark/commit/ab7d966b9fa42c8852da4f3a196ab891b845c1b8).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #82891 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82891/testReport)** for PR 18692 at commit [`e0e6ad3`](https://github.com/apache/spark/commit/e0e6ad381fc6f9b0b957e91e7c7df35207190021).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #84349 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84349/testReport)** for PR 18692 at commit [`0e5a9f2`](https://github.com/apache/spark/commit/0e5a9f20372b9687538b55dd2915af122afe7cb6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/18692


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153060595
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    + */
    +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      eliminateCrossJoin(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join@Join(leftPlan, rightPlan, Cross, None) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +      val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
    +      if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join
    +  }
    +
    +  private def inferJoinPredicates(
    +      leftConstraints: Set[Expression],
    +      rightConstraints: Set[Expression]): Set[EqualTo] = {
    +
    +    // iterate through the left constraints and build a hash map that points semantically
    +    // equivalent expressions into attributes
    +    val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
    +    val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case _ => map
    +      }
    +    }
    +
    +    // iterate through the right constraints and infer join conditions using the equivalence map
    +    rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case _ => joinConditions
    +      }
    +    }
    +  }
    +
    +  private def updateEquivalenceMap(
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      attr: Attribute,
    +      expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
    +
    +    val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute])
    +    if (equivalentAttrs.contains(attr)) {
    +      equivalenceMap
    +    } else {
    +      equivalenceMap.updated(expr, equivalentAttrs + attr)
    +    }
    +  }
    +
    +  private def appendJoinConditions(
    +      attr: Attribute,
    +      expr: Expression,
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      joinConditions: Set[EqualTo]): Set[EqualTo] = {
    +
    +    equivalenceMap.get(expr) match {
    +      case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _))
    +      case None => joinConditions
    +    }
    +  }
    +
    +  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions
    +  implicit class SemanticExpression(private val expr: Expression) {
    --- End diff --
    
    Can we reuse `EquivalentExpressions`? You can search the code base and see how the others use it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84349/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @aokolnychyi Could you rethink about it by using some cases like `a in (0, 2, 3, 4)` and `b in (0, 2, 3, 4)`? and then refer to `a = b`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by Aklakan <gi...@git.apache.org>.
Github user Aklakan commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    Hi, its unfortunate to see this PR having gotten reverted
    
    @gatorsmile 
    > After rethinking about it, we might need to revert this PR. Although it converts a CROSS Join to an Inner join, it does not improve the performance.
    
    The point of this issue is not performance improvement, but that (in our case *automatically generated queries*) *do not work at all* with SPARK, whereas there is no problem with these queries in PostgreSQL and MySQL.
    
    @aokolnychyi 
    ```IN``` are not equals-constraints (even though they could be expanded: `a in (x, y, z) -> a == x OR a == y OR a == z`), so your example does not apply to join inference of literals.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @gatorsmile I updated the rule to cover cross join cases. Regarding the case with the redundant condition mentioned by you, I opened [SPARK-21652](https://issues.apache.org/jira/browse/SPARK-21652). It is an existing issue and is not caused by the proposed rule. BTW, I can try to fix it once we agree on a solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r137110370
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable
    + * only to CROSS joins.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate.
    + */
    +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      inferJoinConditions(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join @ Join(left, right, Cross, conditionOpt) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +
    +      val newConditionOpt = conditionOpt match {
    +        case Some(condition) =>
    +          val existingPredicates = splitConjunctivePredicates(condition)
    +          val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates)
    +          if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None
    +        case None =>
    +          inferredJoinPredicates.reduceOption(And)
    +      }
    +      if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join
    --- End diff --
    
    Based on the discussion we did above, we only enable this rule for cartesian products. That means, the above codes should be like
    ```Scala
        case join @ Join(left, right, Cross, None) =>
          val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet))
          val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet))
          val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
          val newConditionOpt = inferredJoinPredicates.reduceOption(And)
          if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18692#discussion_r153420088
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala ---
    @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
           if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
       }
     }
    +
    +/**
    + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints.
    + *
    + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join
    + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN
    + * node's requirements which otherwise could have.
    + *
    + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right
    + * relation has 'b = 1', the rule infers 'a = b' as a join predicate.
    + */
    +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = {
    +    if (SQLConf.get.constraintPropagationEnabled) {
    +      eliminateCrossJoin(plan)
    +    } else {
    +      plan
    +    }
    +  }
    +
    +  private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case join@Join(leftPlan, rightPlan, Cross, None) =>
    +      val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet))
    +      val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet))
    +      val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints)
    +      val joinConditionOpt = inferredJoinPredicates.reduceOption(And)
    +      if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join
    +  }
    +
    +  private def inferJoinPredicates(
    +      leftConstraints: Set[Expression],
    +      rightConstraints: Set[Expression]): Set[EqualTo] = {
    +
    +    // iterate through the left constraints and build a hash map that points semantically
    +    // equivalent expressions into attributes
    +    val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]]
    +    val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          updateEquivalenceMap(map, attr, expr)
    +        case _ => map
    +      }
    +    }
    +
    +    // iterate through the right constraints and infer join conditions using the equivalence map
    +    rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) =>
    +      constraint match {
    +        case EqualTo(attr: Attribute, expr: Expression) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case EqualTo(expr: Expression, attr: Attribute) =>
    +          appendJoinConditions(attr, expr, equivalenceMap, joinConditions)
    +        case _ => joinConditions
    +      }
    +    }
    +  }
    +
    +  private def updateEquivalenceMap(
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      attr: Attribute,
    +      expr: Expression): Map[SemanticExpression, Set[Attribute]] = {
    +
    +    val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute])
    +    if (equivalentAttrs.contains(attr)) {
    +      equivalenceMap
    +    } else {
    +      equivalenceMap.updated(expr, equivalentAttrs + attr)
    +    }
    +  }
    +
    +  private def appendJoinConditions(
    +      attr: Attribute,
    +      expr: Expression,
    +      equivalenceMap: Map[SemanticExpression, Set[Attribute]],
    +      joinConditions: Set[EqualTo]): Set[EqualTo] = {
    +
    +    equivalenceMap.get(expr) match {
    +      case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _))
    +      case None => joinConditions
    +    }
    +  }
    +
    +  // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions
    +  implicit class SemanticExpression(private val expr: Expression) {
    --- End diff --
    
    I am afraid ``ExpressionSet`` will not help here since we need to map a semantically equivalent expression into a set of attributes that correspond to it. It is not enough to check if there is an equivalent expression. Therefore, ``EquivalentExpressions`` and ``ExpressionSet`` are not applicable (as far as I see). 
    
    ``EquivalentExpressionMap`` from the previous comment assumes the following workflow:
    
    ```
    val equivalentExressionMap = new EquivalentExpressionMap
    ...
    equivalentExressionMap.put(1 * 2, t1.a)
    equivalentExressionMap.put(3, t1.b)
    ...
    equivalentExressionMap.get(1 * 2) // Set(t1.a)
    equivalentExressionMap.get(2 * 1) // Set(t1.a)
    
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    **[Test build #84177 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84177/testReport)** for PR 18692 at commit [`3e090f9`](https://github.com/apache/spark/commit/3e090f9e4e6be8efb814e2e809a0718a0c670718).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...

Posted by aokolnychyi <gi...@git.apache.org>.
Github user aokolnychyi commented on the issue:

    https://github.com/apache/spark/pull/18692
  
    @SimonBin The initial solution handled your case but then there was a decision to restrict the proposed rule to cross joins only. You can find the reason in this [comment](https://github.com/apache/spark/pull/18692#issuecomment-326694822) or in the code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org