You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yucai <gi...@git.apache.org> on 2018/06/14 08:49:08 UTC

[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

GitHub user yucai opened a pull request:

    https://github.com/apache/spark/pull/21564

    [SPARK-24556][SQL] ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning

    ## What changes were proposed in this pull request?
    
    Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see:
    
    ```
    val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
    val df1 = df.as("t1")
    val df2 = df.as("t2")
    val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
    t.cache.orderBy($"t2.j").explain
    ```
    Before:
    ```
    == Physical Plan ==
    *(1) Sort [j#14 ASC NULLS FIRST], true, 0
    +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
       +- InMemoryTableScan [i#5, j#6, i#13, j#14]
             +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
                   +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
                      :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
                      :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                      :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
                      :        +- LocalTableScan [i#5, j#6]
                      +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                         +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
    ```
    Better plan should avoid ```Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)```, like:
    ```
    == Physical Plan ==
    *(1) Sort [j#14 ASC NULLS FIRST], true, 0
    +- InMemoryTableScan [i#5, j#6, i#13, j#14]
          +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
                +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
                   :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                   :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
                   :        +- LocalTableScan [i#5, j#6]
                   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                      +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
    ```
    
    ## How was this patch tested?
    
    Add new tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yucai/spark SPARK-24556

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21564
    
----

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195630214
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    For `PartitioningCollection`, I think it is harder to treat it like `HashPartitioning` and `RangePartitioning` when replacing attributes.
    
    In above example, `PartitioningCollection` contains `HashPartitioning(i#5)` and `HashPartitioning(m#15)`, the output of `InMemoryRelation` is `[i#54, j#55, m#58, n#59]`.  Can we still replace attributes based on the location of attribute in output?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91970/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195354829
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    Good suggestion, thanks @mgaido91.
    
    @viirya Do we need consider below:
    `PartitioningCollection` in `InMemoryTableScanExec.outputPartitioning`, which is also `Expression`?
    `PartitioningCollection` and `BroadcastPartitioning` in `ReusedExchangeExec.outputPartitioning`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r196298655
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,67 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " +
    +    "and InMemoryTableScanExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      assert(reusedExchange.size == 1)
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScan = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      assert(inMemoryScan.size == 1)
    +      checkOutputPartitioningRewrite(inMemoryScan, expectedPartitioningClass)
    +    }
    +
    +    // ReusedExchange is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // ReusedExchange is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
    +
    +    // InMemoryTableScan is HashPartitioning
    +    df1.persist()
    --- End diff --
    
    I feel it's better to not reuse the dataframe that were used to test ReuseExchange


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    LGTM except the test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195883713
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -2270,4 +2270,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1))
         checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1))
       }
    +
    +  test("SPARK-24556: ReusedExchange should rewrite output partitioning for RangePartitioning") {
    --- End diff --
    
    this is not an end-to-end test, let's put it in `PlannerSuite` and also test cached table.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195349283
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    why not just `updateAttribute(r)`?
    
    Moreover, in order to avoid the same issue in the future with other cases, have you considered doing something like:
    ```
    updateAttribute(relation.cachedPlan.outputPartitioning)
    ``
    ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195898107
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    --- End diff --
    
    probably a `spark.range` is enough instead of creating a df and ordering it


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195352300
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    Not all `Partitioning` are `Expression`. Only `HashPartitioning` and `RangePartitioning` are.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91970 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91970/testReport)** for PR 21564 at commit [`85dc1bc`](https://github.com/apache/spark/commit/85dc1bc4f5bc7d1f52769c5c74b25f4ee43a00c5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195652026
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    @viirya From `updateAttribute`, `relation.cachedPlan.output` and `relation.output` one to one.
    ``` 
     private def updateAttribute(expr: Expression): Expression = {
        ....
        val attrMap = AttributeMap(relation.cachedPlan.output.zip(relation.output))
        ....
      }
    ```
    It means "[i#54, j#55, m#58, n#59]" corresponds to "[i#5, j#6, m#15, n#16]", so we can always replace `HashPartitioning(i#5)` to `HashPartitioning(i#54)`.
    Any idea?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91972 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91972/testReport)** for PR 21564 at commit [`7d1c2f2`](https://github.com/apache/spark/commit/7d1c2f24eb8fc99903ea05316ce7cd625c852082).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #92074 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92074/testReport)** for PR 21564 at commit [`6744a9e`](https://github.com/apache/spark/commit/6744a9ee5fd0785572bf5a934fafe59474d98922).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #92082 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92082/testReport)** for PR 21564 at commit [`dcd0ce9`](https://github.com/apache/spark/commit/dcd0ce9a232aa23cdd2a74f6d130007615ac9cd4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r196289446
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,70 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec" +
    +    "and InMemoryTableScanExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      assert(reusedExchange.size == 1)
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      assert(inMemoryScans.size == 2)
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    // ReusedExchange is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // ReusedExchange is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df5 = df1.persist()
    +    val df6 = df2.persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df5.union(df6), classOf[HashPartitioning])
    --- End diff --
    
    union is used to trigger exchange reuse, but it's unnecessary to test cache.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92074/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    @yucai thanks, can you please also add more UTs in order to cover all the possible cases? Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91856/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91829 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91829/testReport)** for PR 21564 at commit [`f37139b`](https://github.com/apache/spark/commit/f37139b2d07497af9df1984e5fb7a50931efbf9a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91829 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91829/testReport)** for PR 21564 at commit [`f37139b`](https://github.com/apache/spark/commit/f37139b2d07497af9df1984e5fb7a50931efbf9a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91856 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91856/testReport)** for PR 21564 at commit [`405ba94`](https://github.com/apache/spark/commit/405ba9441973a186569bbf733907bd9445331c34).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195897400
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    +    val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
    +
    +    // InMemoryTableScan is PartitioningCollection
    +    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") {
    +      val df5 =
    +        Seq(1 -> "a").toDF("i", "j").join(Seq(1 -> "a").toDF("m", "n"), $"i" === $"m").persist()
    +      val df6 =
    +        Seq(1 -> "a").toDF("i", "j").join(Seq(1 -> "a").toDF("m", "n"), $"i" === $"m").persist()
    +      checkInMemoryTableScanOutputPartitioningRewrite(
    +        df5.union(df6), classOf[PartitioningCollection])
    +    }
    +
    +    // ReusedExchange is HashPartitioning
    +    val df7 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    val df8 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df7.union(df8), classOf[HashPartitioning])
    +
    +    // ReusedExchange is RangePartitioning
    +    val df9 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    val df10 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    --- End diff --
    
    Seems this test can be simplified. For example the difference between df3, df4 and df9, df10 is only `persist`. You can just define the dataframes and reuse them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195883736
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -2270,4 +2270,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1))
         checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1))
       }
    +
    +  test("SPARK-24556: ReusedExchange should rewrite output partitioning for RangePartitioning") {
    --- End diff --
    
    please also mention cached table in PR title


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195901356
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    --- End diff --
    
    They are different, in `ReusedExchange` we need Shuffle, so we need orderBy, while in `InMemoryTableScan`, we can use spark.range directly, right?
    ```
    +    // ReusedExchange is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df7 = spark.range(1, 100, 1, 10).toDF().persist()
    +    val df8 = spark.range(1, 100, 1, 10).toDF().persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df7.union(df8), classOf[RangePartitioning])
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91968/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #92074 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92074/testReport)** for PR 21564 at commit [`6744a9e`](https://github.com/apache/spark/commit/6744a9ee5fd0785572bf5a934fafe59474d98922).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91970 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91970/testReport)** for PR 21564 at commit [`85dc1bc`](https://github.com/apache/spark/commit/85dc1bc4f5bc7d1f52769c5c74b25f4ee43a00c5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r196305164
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,67 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " +
    +    "and InMemoryTableScanExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    --- End diff --
    
    How do you think if I need merge `check*OutputPartitioningRewrite` together?
    ```
        def checkPlanAndOutputPartitioningRewrite(
            df: DataFrame,
            expectedPlanClass: Class[_],
            expectedPartitioningClass: Class[_]): Unit = {
          val plans = df.queryExecution.executedPlan.collect {
            case r: ReusedExchangeExec => r
            case m: InMemoryTableScanExec => m
          }
          assert(plans.size == 1)
          val plan = plans.head
          assert(plan.getClass == expectedPlanClass)
          val partitioning = plan.outputPartitioning
          assert(partitioning.getClass == expectedPartitioningClass)
          val partitionedAttrs = partitioning.asInstanceOf[Expression].references
          assert(partitionedAttrs.subsetOf(plan.outputSet))
        }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195463301
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    `PartitioningCollection` should be considered. Like below case:
    ```
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    spark.conf.set("spark.sql.codegen.wholeStage", false)
    val df1 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j").as("t1")
    val df2 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("m", "n").as("t2")
    val d = df1.join(df2, $"t1.i" === $"t2.m")
    d.cache
    val d1 = d.as("t3")
    val d2 = d.as("t4")
    d1.join(d2, $"t3.i" === $"t4.i").explain
    ```
    ```
    SortMergeJoin [i#5], [i#54], Inner
    :- InMemoryTableScan [i#5, j#6, m#15, n#16]
    :     +- InMemoryRelation [i#5, j#6, m#15, n#16], CachedRDDBuilder
    :           +- SortMergeJoin [i#5], [m#15], Inner
    :              :- Sort [i#5 ASC NULLS FIRST], false, 0
    :              :  +- Exchange hashpartitioning(i#5, 10)
    :              :     +- LocalTableScan [i#5, j#6]
    :              +- Sort [m#15 ASC NULLS FIRST], false, 0
    :                 +- Exchange hashpartitioning(m#15, 10)
    :                    +- LocalTableScan [m#15, n#16]
    +- Sort [i#54 ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(i#54, 10)
          +- InMemoryTableScan [i#54, j#55, m#58, n#59]
                +- InMemoryRelation [i#54, j#55, m#58, n#59], CachedRDDBuilder
                      +- SortMergeJoin [i#5], [m#15], Inner
                         :- Sort [i#5 ASC NULLS FIRST], false, 0
                         :  +- Exchange hashpartitioning(i#5, 10)
                         :     +- LocalTableScan [i#5, j#6]
                         +- Sort [m#15 ASC NULLS FIRST], false, 0
                            +- Exchange hashpartitioning(m#15, 10)
                               +- LocalTableScan [m#15, n#16]
    ```
    `Exchange hashpartitioning(i#54, 10)` is extra shuffle.
    
    How do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195886078
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
    @@ -70,7 +70,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
       }
     
       override def outputPartitioning: Partitioning = child.outputPartitioning match {
    -    case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr))
    +    case e: Expression => updateAttr(e).asInstanceOf[Partitioning]
         case other => other
    --- End diff --
    
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195356161
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    I think `PartitioningCollection` is for an operator that has multiple children. `BroadcastPartitioning` is not `Expression`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195900959
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    --- End diff --
    
    ok, updated.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91972 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91972/testReport)** for PR 21564 at commit [`7d1c2f2`](https://github.com/apache/spark/commit/7d1c2f24eb8fc99903ea05316ce7cd625c852082).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91856 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91856/testReport)** for PR 21564 at commit [`405ba94`](https://github.com/apache/spark/commit/405ba9441973a186569bbf733907bd9445331c34).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r196242151
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,70 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec" +
    +    "and InMemoryTableScanExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      assert(reusedExchange.size == 1)
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      assert(inMemoryScans.size == 2)
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    // ReusedExchange is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // ReusedExchange is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df5 = df1.persist()
    +    val df6 = df2.persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df5.union(df6), classOf[HashPartitioning])
    --- End diff --
    
    why do we need to test table cache with union?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195901183
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    --- End diff --
    
    why didn't you just set:
    
    ```
    val df3 = spark.range ...
    val df4 = spark.range ...
    ```
    but you let them as before and than you changed the other place where they were used?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91855/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21564


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r196267539
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,70 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec" +
    +    "and InMemoryTableScanExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      assert(reusedExchange.size == 1)
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      assert(inMemoryScans.size == 2)
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    // ReusedExchange is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // ReusedExchange is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df5 = df1.persist()
    +    val df6 = df2.persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df5.union(df6), classOf[HashPartitioning])
    --- End diff --
    
    I want to make sure both cache have the right output partitioning, so test the second cache table only?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91855 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91855/testReport)** for PR 21564 at commit [`0ef99cc`](https://github.com/apache/spark/commit/0ef99cc972a54fd9c98338e54a7e4e6b9a213654).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195361725
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    `BroadcastPartitioning`'s `BroadcastMode` contains `Expression`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #92080 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92080/testReport)** for PR 21564 at commit [`dcd0ce9`](https://github.com/apache/spark/commit/dcd0ce9a232aa23cdd2a74f6d130007615ac9cd4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91855 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91855/testReport)** for PR 21564 at commit [`0ef99cc`](https://github.com/apache/spark/commit/0ef99cc972a54fd9c98338e54a7e4e6b9a213654).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91968 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91968/testReport)** for PR 21564 at commit [`9da85e0`](https://github.com/apache/spark/commit/9da85e0122c1f114258665f29a2390975573e5b9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    @mgaido91 I update the codes as per your suggestion, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    @cloud-fan thanks for reviewing, tests have been updated.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    @viirya I think`PartitioningCollection` should be considered. Like below case:
    ```
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    spark.conf.set("spark.sql.codegen.wholeStage", false)
    val df1 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j").as("t1")
    val df2 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("m", "n").as("t2")
    val d = df1.join(df2, $"t1.i" === $"t2.m")
    d.cache
    val d1 = d.as("t3")
    val d2 = d.as("t4")
    d1.join(d2, $"t3.i" === $"t4.i").explain
    ```
    ```
    SortMergeJoin [i#5], [i#54], Inner
    :- InMemoryTableScan [i#5, j#6, m#15, n#16]
    :     +- InMemoryRelation [i#5, j#6, m#15, n#16], CachedRDDBuilder
    :           +- SortMergeJoin [i#5], [m#15], Inner
    :              :- Sort [i#5 ASC NULLS FIRST], false, 0
    :              :  +- Exchange hashpartitioning(i#5, 10)
    :              :     +- LocalTableScan [i#5, j#6]
    :              +- Sort [m#15 ASC NULLS FIRST], false, 0
    :                 +- Exchange hashpartitioning(m#15, 10)
    :                    +- LocalTableScan [m#15, n#16]
    +- Sort [i#54 ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(i#54, 10)
          +- InMemoryTableScan [i#54, j#55, m#58, n#59]
                +- InMemoryRelation [i#54, j#55, m#58, n#59], CachedRDDBuilder
                      +- SortMergeJoin [i#5], [m#15], Inner
                         :- Sort [i#5 ASC NULLS FIRST], false, 0
                         :  +- Exchange hashpartitioning(i#5, 10)
                         :     +- LocalTableScan [i#5, j#6]
                         +- Sort [m#15 ASC NULLS FIRST], false, 0
                            +- Exchange hashpartitioning(m#15, 10)
                               +- LocalTableScan [m#15, n#16]
    ```
    `Exchange hashpartitioning(i#54, 10)` is extra shuffle.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #92080 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92080/testReport)** for PR 21564 at commit [`dcd0ce9`](https://github.com/apache/spark/commit/dcd0ce9a232aa23cdd2a74f6d130007615ac9cd4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #92082 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92082/testReport)** for PR 21564 at commit [`dcd0ce9`](https://github.com/apache/spark/commit/dcd0ce9a232aa23cdd2a74f6d130007615ac9cd4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r196298480
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,67 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " +
    +    "and InMemoryTableScanExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    --- End diff --
    
    now we can take a single spark plan


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r196299220
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,67 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " +
    +    "and InMemoryTableScanExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      assert(reusedExchange.size == 1)
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScan = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      assert(inMemoryScan.size == 1)
    +      checkOutputPartitioningRewrite(inMemoryScan, expectedPartitioningClass)
    +    }
    +
    +    // ReusedExchange is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // ReusedExchange is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i")
    +    checkReusedExchangeOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
    +
    +    // InMemoryTableScan is HashPartitioning
    +    df1.persist()
    --- End diff --
    
    Agree, I also like a new one :).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Thanks for fixing this! LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #92068 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92068/testReport)** for PR 21564 at commit [`6744a9e`](https://github.com/apache/spark/commit/6744a9ee5fd0785572bf5a934fafe59474d98922).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195348233
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    Not sure why `RangePartitioning` isn't included at first.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    @cloud-fan @viirya @gatorsmile , could you help review this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92068/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195898606
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    --- End diff --
    
    I just have an update of tests, feel free to let me know if you are OK with the new version.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195420136
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    Hmm, `HashPartitioning` and `RangePartitioning` can affect later sorting and shuffle. But for `BroadcastPartitioning`, seems to me no such benefit. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91829/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195898686
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    --- End diff --
    
    I am OK apart from this comment which is still unresolved in the new version. Instead of doing an unneeded sort, we can just simply have a `Range` operation which has `RangePartitioning` as output partitioning.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #92068 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92068/testReport)** for PR 21564 at commit [`6744a9e`](https://github.com/apache/spark/commit/6744a9ee5fd0785572bf5a934fafe59474d98922).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r196335623
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,67 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " +
    +    "and InMemoryTableScanExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    --- End diff --
    
    @cloud-fan I still use `Seq`, so I can make `checkReusedExchangeOutputPartitioningRewrite` and `checkInMemoryTableScanOutputPartitioningRewrite` simpler. Kindly let me know if you have better idea.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195355721
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    yes, you're right @viirya , thanks. Then, I'd propose something like:
    ```
    relation.cachedPlan.outputPartitioning match {
     case e: Expression => updateAttribute(e)
     case other => other
    }
    ```
    
    what do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91972/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195898387
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    --- End diff --
    
    I want `RangePartitioning` here, so using `orderBy`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195898092
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    +    val df4 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df3.union(df4), classOf[RangePartitioning])
    +
    +    // InMemoryTableScan is PartitioningCollection
    +    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") {
    --- End diff --
    
    nit: please use `SQLConf` instead of the plain string (and the value here I think should be `-1`)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] Always rewrite output partitio...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195901400
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
    @@ -686,6 +686,68 @@ class PlannerSuite extends SharedSQLContext {
           Range(1, 2, 1, 1)))
         df.queryExecution.executedPlan.execute()
       }
    +
    +  test("SPARK-24556: always rewrite output partitioning in InMemoryTableScanExec" +
    +    "and ReusedExchangeExec") {
    +    def checkOutputPartitioningRewrite(
    +        plans: Seq[SparkPlan],
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      plans.foreach { plan =>
    +        val partitioning = plan.outputPartitioning
    +        assert(partitioning.getClass == expectedPartitioningClass)
    +        val partitionedAttrs = partitioning.asInstanceOf[Expression].references
    +        assert(partitionedAttrs.subsetOf(plan.outputSet))
    +      }
    +    }
    +
    +    def checkInMemoryTableScanOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val inMemoryScans = df.queryExecution.executedPlan.collect {
    +        case m: InMemoryTableScanExec => m
    +      }
    +      checkOutputPartitioningRewrite(inMemoryScans, expectedPartitioningClass)
    +    }
    +
    +    def checkReusedExchangeOutputPartitioningRewrite(
    +        df: DataFrame,
    +        expectedPartitioningClass: Class[_]): Unit = {
    +      val reusedExchange = df.queryExecution.executedPlan.collect {
    +        case r: ReusedExchangeExec => r
    +      }
    +      checkOutputPartitioningRewrite(reusedExchange, expectedPartitioningClass)
    +    }
    +
    +    // InMemoryTableScan is HashPartitioning
    +    val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i").persist()
    +    checkInMemoryTableScanOutputPartitioningRewrite(df1.union(df2), classOf[HashPartitioning])
    +
    +    // InMemoryTableScan is RangePartitioning
    +    val df3 = Seq(1 -> "a").toDF("i", "j").orderBy($"i").persist()
    --- End diff --
    
    oh, sure, sorry, thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195366665
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    Oh, like `HashedRelationBroadcastMode`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21564#discussion_r195654141
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---
    @@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
       override def outputPartitioning: Partitioning = {
         relation.cachedPlan.outputPartitioning match {
           case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
    +      case r: RangePartitioning =>
    +        r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
    --- End diff --
    
    Looks correct.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    **[Test build #91968 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91968/testReport)** for PR 21564 at commit [`9da85e0`](https://github.com/apache/spark/commit/9da85e0122c1f114258665f29a2390975573e5b9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92082/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92080/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21564: [SPARK-24556][SQL] Always rewrite output partitioning in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21564
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org