You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tejasapatil <gi...@git.apache.org> on 2017/08/25 15:14:18 UTC

[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

GitHub user tejasapatil opened a pull request:

    https://github.com/apache/spark/pull/19054

    [SPARK-18067] Avoid shuffling child if join keys are superset of child's partitioning keys

    Jira : https://issues.apache.org/jira/browse/SPARK-18067
    
    ## What problem is being addressed in this PR ?
    
    Currently shuffle based joins require its children to be shuffled over all the columns in the join condition. In case the child node is already distributed over a subset of columns in the join condition, this shuffle is not needed (eg. if the input is bucketed, if the input is output of a subquery). Avoiding the shuffle makes the join run faster and more stably as its single stage.
    
    To dive deeper, lets look at this example. Both input tables `table1` and `table2` are bucketed on columns `i` and `j` and have 8 buckets. The query is joining the 2 tables over `i,j,k`. With bucketing, all the rows with the same values of `i` and `j` should reside in the same bucket of both the inputs. So, if we simply sort the corresponding buckets over the join columns and perform the join, that would suffice the requirements.
    
    | partitions  | table1 (i,j,k) values | table2 (i,j,k) values  |
    | ------------- | ------------- | ------------- |
    | bucket 0  | (0,0,1)  (0,0,2)  (1,0,4) | (0,0,1)  (0,0,3) |
    | bucket 1  | (1,0,2) (1,1,1) | (1,0,1) (1,0,2) (1,1,2) |
    | bucket 2  | (0,1,8) (0,1,6) | (0,1,1) |
    
    ## What changes were proposed in this pull request?
    
    Both shuffled hash join and sort merge join would not keep track of `which keys should the children be distributed on ?`. To start off, this is same as the join keys. The rule `ReorderJoinPredicates` is modified to detect if the child's output partitioning is over a subset of join keys and based on that the distribution keys for the join operator are revised.
    
    ## How was this patch tested?
    
    Added unit test.
    
    Here is manual test:
    
    Query:
    ```
    SELECT * FROM table1 a JOIN table2 b ON a.i = b.i AND a.j = b.j AND a.k = b.k
    ```
    
    BEFORE
    ```
    SortMergeJoin [i#5, j#6, k#7], [i#8, j#9, k#10], Inner
    :- *Sort [i#5 ASC NULLS FIRST, j#6 ASC NULLS FIRST, k#7 ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(i#5, j#6, k#7, 200)
    :     +- *FileScan orc default.table1[i#5,j#6,k#7] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int,j:int,k:string>
    +- *Sort [i#8 ASC NULLS FIRST, j#9 ASC NULLS FIRST, k#10 ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(i#8, j#9, k#10, 200)
          +- *FileScan orc default.table2[i#8,j#9,k#10] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:warehouse/table2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int,j:int,k:string>
    ```
    
    AFTER
    ```
    SortMergeJoin [i#5, j#6, k#7], [i#8, j#9, k#10], [i#5, j#6], [i#8, j#9], Inner
    :- *Sort [i#5 ASC NULLS FIRST, j#6 ASC NULLS FIRST, k#7 ASC NULLS FIRST], false, 0
    :  +- *FileScan orc default.table1[i#5,j#6,k#7] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int,j:int,k:string>
    +- *Sort [i#8 ASC NULLS FIRST, j#9 ASC NULLS FIRST, k#10 ASC NULLS FIRST], false, 0
       +- *FileScan orc default.table2[i#8,j#9,k#10] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:warehouse/table2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int,j:int,k:string>
    ```


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tejasapatil/spark SPARK-18067_take2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19054.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19054
    
----
commit 9ba8add397f00ac8674f5c0194aa606c99733532
Author: Tejas Patil <te...@fb.com>
Date:   2017-08-25T14:29:28Z

    [SPARK-18067] Avoid shuffling child if join keys are superset of child's partitioning keys

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #84368 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84368/testReport)** for PR 19054 at commit [`69e288e`](https://github.com/apache/spark/commit/69e288efda8dbaab667f0e9ed6c7f2cb811f79b1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81562/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #86406 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86406/testReport)** for PR 19054 at commit [`00bb14b`](https://github.com/apache/spark/commit/00bb14b0145a2bd42c8b4c8a9d4f108322804f71).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19054#discussion_r162768516
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ---
    @@ -220,45 +220,76 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         operator.withNewChildren(children)
       }
     
    +  private def isSubset(biggerSet: Seq[Expression], smallerSet: Seq[Expression]): Boolean =
    +    smallerSet.length <= biggerSet.length &&
    +      smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x)))
    +
       private def reorder(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
    -      expectedOrderOfKeys: Seq[Expression],
    -      currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
    -    val leftKeysBuffer = ArrayBuffer[Expression]()
    -    val rightKeysBuffer = ArrayBuffer[Expression]()
    +      expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning
    +      currentOrderOfKeys: Seq[Expression]): // comes from join predicate
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    +
    +    assert(leftKeys.length == rightKeys.length)
    +
    +    val allLeftKeys = ArrayBuffer[Expression]()
    +    val allRightKeys = ArrayBuffer[Expression]()
    +    val reorderedLeftKeys = ArrayBuffer[Expression]()
    +    val reorderedRightKeys = ArrayBuffer[Expression]()
    +    val processedIndicies = mutable.Set[Int]()
     
         expectedOrderOfKeys.foreach(expression => {
    -      val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression))
    -      leftKeysBuffer.append(leftKeys(index))
    -      rightKeysBuffer.append(rightKeys(index))
    +      val index = currentOrderOfKeys.zipWithIndex.find { case (currKey, i) =>
    +        !processedIndicies.contains(i) && currKey.semanticEquals(expression)
    +      }.get._2
    +      processedIndicies.add(index)
    +
    +      reorderedLeftKeys.append(leftKeys(index))
    +      allLeftKeys.append(leftKeys(index))
    +
    +      reorderedRightKeys.append(rightKeys(index))
    +      allRightKeys.append(rightKeys(index))
         })
    -    (leftKeysBuffer, rightKeysBuffer)
    +
    +    // If len(currentOrderOfKeys) > len(expectedOrderOfKeys), then the re-ordering won't have
    +    // all the keys. Append the remaining keys to the end so that we are covering all the keys
    +    for (i <- leftKeys.indices) {
    +      if (!processedIndicies.contains(i)) {
    +        allLeftKeys.append(leftKeys(i))
    +        allRightKeys.append(rightKeys(i))
    +      }
    +    }
    +
    +    assert(allLeftKeys.length == leftKeys.length)
    +    assert(allRightKeys.length == rightKeys.length)
    +    assert(reorderedLeftKeys.length == reorderedRightKeys.length)
    +
    +    (allLeftKeys, reorderedLeftKeys, allRightKeys, reorderedRightKeys)
       }
     
       private def reorderJoinKeys(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
           leftPartitioning: Partitioning,
    -      rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {
    +      rightPartitioning: Partitioning):
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    +
         if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) {
           leftPartitioning match {
    -        case HashPartitioning(leftExpressions, _)
    -          if leftExpressions.length == leftKeys.length &&
    -            leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) =>
    +        case HashPartitioning(leftExpressions, _) if isSubset(leftKeys, leftExpressions) =>
               reorder(leftKeys, rightKeys, leftExpressions, leftKeys)
    --- End diff --
    
    given that this was only done over `SortMergeJoinExec` and `ShuffledHashJoinExec` where both the partitionings are `HashPartitioning`, things worked fine. I have changed this to have a stricter check.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #81562 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81562/testReport)** for PR 19054 at commit [`ec8bd80`](https://github.com/apache/spark/commit/ec8bd80b75c305dcc69290580ec20e26d5ef96df).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    cc @hvanhovell @cloud-fan for review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #81562 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81562/testReport)** for PR 19054 at commit [`ec8bd80`](https://github.com/apache/spark/commit/ec8bd80b75c305dcc69290580ec20e26d5ef96df).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #84368 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84368/testReport)** for PR 19054 at commit [`69e288e`](https://github.com/apache/spark/commit/69e288efda8dbaab667f0e9ed6c7f2cb811f79b1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19054#discussion_r162768446
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ---
    @@ -220,45 +220,76 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         operator.withNewChildren(children)
       }
     
    +  private def isSubset(biggerSet: Seq[Expression], smallerSet: Seq[Expression]): Boolean =
    +    smallerSet.length <= biggerSet.length &&
    +      smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x)))
    +
       private def reorder(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
    -      expectedOrderOfKeys: Seq[Expression],
    -      currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
    -    val leftKeysBuffer = ArrayBuffer[Expression]()
    -    val rightKeysBuffer = ArrayBuffer[Expression]()
    +      expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning
    +      currentOrderOfKeys: Seq[Expression]): // comes from join predicate
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    +
    +    assert(leftKeys.length == rightKeys.length)
    +
    +    val allLeftKeys = ArrayBuffer[Expression]()
    +    val allRightKeys = ArrayBuffer[Expression]()
    +    val reorderedLeftKeys = ArrayBuffer[Expression]()
    +    val reorderedRightKeys = ArrayBuffer[Expression]()
    +    val processedIndicies = mutable.Set[Int]()
     
         expectedOrderOfKeys.foreach(expression => {
    -      val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression))
    -      leftKeysBuffer.append(leftKeys(index))
    -      rightKeysBuffer.append(rightKeys(index))
    +      val index = currentOrderOfKeys.zipWithIndex.find { case (currKey, i) =>
    +        !processedIndicies.contains(i) && currKey.semanticEquals(expression)
    +      }.get._2
    +      processedIndicies.add(index)
    +
    +      reorderedLeftKeys.append(leftKeys(index))
    +      allLeftKeys.append(leftKeys(index))
    +
    +      reorderedRightKeys.append(rightKeys(index))
    +      allRightKeys.append(rightKeys(index))
         })
    -    (leftKeysBuffer, rightKeysBuffer)
    +
    +    // If len(currentOrderOfKeys) > len(expectedOrderOfKeys), then the re-ordering won't have
    +    // all the keys. Append the remaining keys to the end so that we are covering all the keys
    +    for (i <- leftKeys.indices) {
    +      if (!processedIndicies.contains(i)) {
    +        allLeftKeys.append(leftKeys(i))
    +        allRightKeys.append(rightKeys(i))
    +      }
    +    }
    +
    +    assert(allLeftKeys.length == leftKeys.length)
    +    assert(allRightKeys.length == rightKeys.length)
    +    assert(reorderedLeftKeys.length == reorderedRightKeys.length)
    +
    +    (allLeftKeys, reorderedLeftKeys, allRightKeys, reorderedRightKeys)
       }
     
       private def reorderJoinKeys(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
           leftPartitioning: Partitioning,
    -      rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {
    +      rightPartitioning: Partitioning):
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    --- End diff --
    
    added more doc. I wasn't sure how to make it easier to understand. Hope that the example helps with that


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/53/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84368/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #81131 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81131/testReport)** for PR 19054 at commit [`9ba8add`](https://github.com/apache/spark/commit/9ba8add397f00ac8674f5c0194aa606c99733532).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #84366 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84366/testReport)** for PR 19054 at commit [`b0db6aa`](https://github.com/apache/spark/commit/b0db6aafd3cb172062220aeeae087be9ad5b99e5).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19054#discussion_r162550613
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ---
    @@ -220,45 +220,76 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         operator.withNewChildren(children)
       }
     
    +  private def isSubset(biggerSet: Seq[Expression], smallerSet: Seq[Expression]): Boolean =
    +    smallerSet.length <= biggerSet.length &&
    +      smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x)))
    +
       private def reorder(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
    -      expectedOrderOfKeys: Seq[Expression],
    -      currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
    -    val leftKeysBuffer = ArrayBuffer[Expression]()
    -    val rightKeysBuffer = ArrayBuffer[Expression]()
    +      expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning
    +      currentOrderOfKeys: Seq[Expression]): // comes from join predicate
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    +
    +    assert(leftKeys.length == rightKeys.length)
    +
    +    val allLeftKeys = ArrayBuffer[Expression]()
    +    val allRightKeys = ArrayBuffer[Expression]()
    +    val reorderedLeftKeys = ArrayBuffer[Expression]()
    +    val reorderedRightKeys = ArrayBuffer[Expression]()
    +    val processedIndicies = mutable.Set[Int]()
     
         expectedOrderOfKeys.foreach(expression => {
    -      val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression))
    -      leftKeysBuffer.append(leftKeys(index))
    -      rightKeysBuffer.append(rightKeys(index))
    +      val index = currentOrderOfKeys.zipWithIndex.find { case (currKey, i) =>
    +        !processedIndicies.contains(i) && currKey.semanticEquals(expression)
    +      }.get._2
    +      processedIndicies.add(index)
    +
    +      reorderedLeftKeys.append(leftKeys(index))
    +      allLeftKeys.append(leftKeys(index))
    +
    +      reorderedRightKeys.append(rightKeys(index))
    +      allRightKeys.append(rightKeys(index))
         })
    -    (leftKeysBuffer, rightKeysBuffer)
    +
    +    // If len(currentOrderOfKeys) > len(expectedOrderOfKeys), then the re-ordering won't have
    +    // all the keys. Append the remaining keys to the end so that we are covering all the keys
    +    for (i <- leftKeys.indices) {
    +      if (!processedIndicies.contains(i)) {
    +        allLeftKeys.append(leftKeys(i))
    +        allRightKeys.append(rightKeys(i))
    +      }
    +    }
    +
    +    assert(allLeftKeys.length == leftKeys.length)
    +    assert(allRightKeys.length == rightKeys.length)
    +    assert(reorderedLeftKeys.length == reorderedRightKeys.length)
    +
    +    (allLeftKeys, reorderedLeftKeys, allRightKeys, reorderedRightKeys)
       }
     
       private def reorderJoinKeys(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
           leftPartitioning: Partitioning,
    -      rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {
    +      rightPartitioning: Partitioning):
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    --- End diff --
    
    We should add some documentation to explain what the return value is.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #85985 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85985/testReport)** for PR 19054 at commit [`c689ff1`](https://github.com/apache/spark/commit/c689ff10a29b6625751e8eee87e1280424e77b5b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #81131 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81131/testReport)** for PR 19054 at commit [`9ba8add`](https://github.com/apache/spark/commit/9ba8add397f00ac8674f5c0194aa606c99733532).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86406/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

Posted by eyalfa <gi...@git.apache.org>.
Github user eyalfa commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19054#discussion_r165861581
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ---
    @@ -220,45 +220,99 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         operator.withNewChildren(children)
       }
     
    +  private def isSubset(biggerSet: Seq[Expression], smallerSet: Seq[Expression]): Boolean =
    +    smallerSet.length <= biggerSet.length &&
    +      smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x)))
    +
    +  /**
    +   * Reorders `leftKeys` and `rightKeys` by aligning `currentOrderOfKeys` to be a prefix of
    +   * `expectedOrderOfKeys`
    +   */
       private def reorder(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
    -      expectedOrderOfKeys: Seq[Expression],
    -      currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
    -    val leftKeysBuffer = ArrayBuffer[Expression]()
    -    val rightKeysBuffer = ArrayBuffer[Expression]()
    +      expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning
    +      currentOrderOfKeys: Seq[Expression]): // comes from join predicate
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    +
    +    assert(leftKeys.length == rightKeys.length)
    +
    +    val allLeftKeys = ArrayBuffer[Expression]()
    +    val allRightKeys = ArrayBuffer[Expression]()
    +    val reorderedLeftKeys = ArrayBuffer[Expression]()
    +    val reorderedRightKeys = ArrayBuffer[Expression]()
    +
    +    // Tracking indicies here to track to which keys are accounted. Using a set based approach
    +    // won't work because its possible that some keys are repeated in the join clause
    +    // eg. a.key1 = b.key1 AND a.key1 = b.key2
    +    val processedIndicies = mutable.Set[Int]()
     
         expectedOrderOfKeys.foreach(expression => {
    -      val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression))
    -      leftKeysBuffer.append(leftKeys(index))
    -      rightKeysBuffer.append(rightKeys(index))
    +      val index = currentOrderOfKeys.zipWithIndex.find { case (currKey, i) =>
    +        !processedIndicies.contains(i) && currKey.semanticEquals(expression)
    +      }.get._2
    --- End diff --
    
    is the find guaranteed to always succeed?
    if so, worth a comment on method's pre/post conditions.
    
    a getOrElse(sys error "...") might also be a good way of documenting this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84366/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

Posted by eyalfa <gi...@git.apache.org>.
Github user eyalfa commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19054#discussion_r165860433
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ---
    @@ -220,45 +220,99 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         operator.withNewChildren(children)
       }
     
    +  private def isSubset(biggerSet: Seq[Expression], smallerSet: Seq[Expression]): Boolean =
    +    smallerSet.length <= biggerSet.length &&
    +      smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x)))
    +
    +  /**
    +   * Reorders `leftKeys` and `rightKeys` by aligning `currentOrderOfKeys` to be a prefix of
    +   * `expectedOrderOfKeys`
    +   */
       private def reorder(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
    -      expectedOrderOfKeys: Seq[Expression],
    -      currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
    -    val leftKeysBuffer = ArrayBuffer[Expression]()
    -    val rightKeysBuffer = ArrayBuffer[Expression]()
    +      expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning
    +      currentOrderOfKeys: Seq[Expression]): // comes from join predicate
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    --- End diff --
    
    can you please add a comment describing the return type? a tuple4 is not such a descriptive type :smiley: 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85985/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #84366 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84366/testReport)** for PR 19054 at commit [`b0db6aa`](https://github.com/apache/spark/commit/b0db6aafd3cb172062220aeeae087be9ad5b99e5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4229/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81131/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    cc @hvanhovell @cloud-fan for review


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #85985 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85985/testReport)** for PR 19054 at commit [`c689ff1`](https://github.com/apache/spark/commit/c689ff10a29b6625751e8eee87e1280424e77b5b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19054: [SPARK-18067] Avoid shuffling child if join keys are sup...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19054
  
    **[Test build #86406 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86406/testReport)** for PR 19054 at commit [`00bb14b`](https://github.com/apache/spark/commit/00bb14b0145a2bd42c8b4c8a9d4f108322804f71).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19054#discussion_r162551159
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ---
    @@ -220,45 +220,76 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         operator.withNewChildren(children)
       }
     
    +  private def isSubset(biggerSet: Seq[Expression], smallerSet: Seq[Expression]): Boolean =
    +    smallerSet.length <= biggerSet.length &&
    +      smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x)))
    +
       private def reorder(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
    -      expectedOrderOfKeys: Seq[Expression],
    -      currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
    -    val leftKeysBuffer = ArrayBuffer[Expression]()
    -    val rightKeysBuffer = ArrayBuffer[Expression]()
    +      expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning
    +      currentOrderOfKeys: Seq[Expression]): // comes from join predicate
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    +
    +    assert(leftKeys.length == rightKeys.length)
    +
    +    val allLeftKeys = ArrayBuffer[Expression]()
    +    val allRightKeys = ArrayBuffer[Expression]()
    +    val reorderedLeftKeys = ArrayBuffer[Expression]()
    +    val reorderedRightKeys = ArrayBuffer[Expression]()
    +    val processedIndicies = mutable.Set[Int]()
     
         expectedOrderOfKeys.foreach(expression => {
    -      val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression))
    -      leftKeysBuffer.append(leftKeys(index))
    -      rightKeysBuffer.append(rightKeys(index))
    +      val index = currentOrderOfKeys.zipWithIndex.find { case (currKey, i) =>
    +        !processedIndicies.contains(i) && currKey.semanticEquals(expression)
    +      }.get._2
    +      processedIndicies.add(index)
    +
    +      reorderedLeftKeys.append(leftKeys(index))
    +      allLeftKeys.append(leftKeys(index))
    +
    +      reorderedRightKeys.append(rightKeys(index))
    +      allRightKeys.append(rightKeys(index))
         })
    -    (leftKeysBuffer, rightKeysBuffer)
    +
    +    // If len(currentOrderOfKeys) > len(expectedOrderOfKeys), then the re-ordering won't have
    +    // all the keys. Append the remaining keys to the end so that we are covering all the keys
    +    for (i <- leftKeys.indices) {
    +      if (!processedIndicies.contains(i)) {
    +        allLeftKeys.append(leftKeys(i))
    +        allRightKeys.append(rightKeys(i))
    +      }
    +    }
    +
    +    assert(allLeftKeys.length == leftKeys.length)
    +    assert(allRightKeys.length == rightKeys.length)
    +    assert(reorderedLeftKeys.length == reorderedRightKeys.length)
    +
    +    (allLeftKeys, reorderedLeftKeys, allRightKeys, reorderedRightKeys)
       }
     
       private def reorderJoinKeys(
           leftKeys: Seq[Expression],
           rightKeys: Seq[Expression],
           leftPartitioning: Partitioning,
    -      rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {
    +      rightPartitioning: Partitioning):
    +  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
    +
         if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) {
           leftPartitioning match {
    -        case HashPartitioning(leftExpressions, _)
    -          if leftExpressions.length == leftKeys.length &&
    -            leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) =>
    +        case HashPartitioning(leftExpressions, _) if isSubset(leftKeys, leftExpressions) =>
               reorder(leftKeys, rightKeys, leftExpressions, leftKeys)
    --- End diff --
    
    if `leftPartitioning` is `HashPartitioning`, we don't need to care about `rightPartitioning` at all?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19054#discussion_r162768714
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ---
    @@ -271,23 +325,24 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
        */
       private def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
         plan.transformUp {
    -      case BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left,
    --- End diff --
    
    Removal of `BroadcastHashJoinExec` is intentional. The children are expected to have `BroadcastDistribution` or `UnspecifiedDistribution` so this method wont help here (this optimization only helps in case of shuffle based joins)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org