You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by caneGuy <gi...@git.apache.org> on 2017/11/16 07:31:16 UTC

[GitHub] spark pull request #19764: [SPARK-22539][SQL] Add second order for rangepart...

GitHub user caneGuy opened a pull request:

    https://github.com/apache/spark/pull/19764

    [SPARK-22539][SQL] Add second order for rangepartitioner since partition nu…

    …mber may be small if the specified key is skewed
    
    ## What changes were proposed in this pull request?
    
    The rangepartitioner generated from shuffle exchange may cause partiton skew if sort key is skewed.
    This patch add second order for rangepartitioner to avoid this situation.
    This is an improvement from real case.
    
    ## How was this patch tested?
    
    Manully test.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/caneGuy/spark zhoukang/add-secondorder

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19764.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19764
    
----
commit 29d2c869ffc6dd4d1a3cf7606cde5d03e72fa171
Author: zhoukang <zh...@gmail.com>
Date:   2017-11-15T07:24:59Z

    [SPARK][SQL] Add second order for rangepartitioner since partition number may be small if the specified key is skewed

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    @gczsjdy Added a simple example.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    Doesn't this break the `Partitioner` contract? Rows for the same key will now map to multiple partitions.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19764: [SPARK-22539][SQL] Add second order for rangepart...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy closed the pull request at:

    https://github.com/apache/spark/pull/19764


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    @gczsjdy Ok,i will post later.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19764: [SPARK-22539][SQL] Add second order for rangepart...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19764#discussion_r151356810
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala ---
    @@ -193,14 +193,30 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
     /**
      * A lazily generated row ordering comparator.
      */
    -class LazilyGeneratedOrdering(val ordering: Seq[SortOrder])
    +class LazilyGeneratedOrdering(
    +    val ordering: Seq[SortOrder], secondOrdering: => Seq[SortOrder] = Seq.empty)
       extends Ordering[InternalRow] with KryoSerializable {
     
       def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
         this(ordering.map(BindReferences.bindReference(_, inputSchema)))
     
       @transient
    -  private[this] var generatedOrdering = GenerateOrdering.generate(ordering)
    +  private[this] var generatedOrdering = {
    +    var generatedOrdering = GenerateOrdering.generate(ordering)
    +    if (!secondOrdering.isEmpty) {
    +      secondOrdering.foreach { order =>
    --- End diff --
    
    Logic here used to filter input order which can not generate ordering normally.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    @caneGuy adding the second ordering makes the sort much more fine grained, this means that the range partitioner is probably going to use different range boundaries using all the ordering parameters.
    
    Let's work through an example here. Say we want to range partition `col1` here to 5 partitions, `col2` will be added by to the secondary ordering by your PR. `col1` has only 3 distinct values to keep the example simple. The following table shows how the columns can map to partition in the current situation and in the new situation:
    
    | col1 | col2 | current partitioning | PR partitioning |
    | ---- | ---- | ------------------- | --------------- |
    | 0  | 0 | 0  | 0 |
    | 1  | 0 | 1  | 1 |
    | 1  | 0 | 1  | 1 |
    | 1  | 1 | 1  | 2 |
    | 2  | 0 | 2 | 3 |
    | 2  | 1 | 2 | 4|
    
    Notice how by adding the secondary ordering the partitioning changes. This breaks the contract of the range partitioner, and will lead to invalid results if we use this for anything besides a top level global ordering (i.e. `select * from tbl_x order by col1`).
    
    If you only want to improve the performance of top level global orderings then it is better to write an optimizer rule/planning strategy that explicitly adds the secondary ordering columns to the ordering. The only caveat here might be that this can hurt the performance in the non-skewed; spark uses radix sorting for simple sorts which is generally faster than timsort which is used for complex cases.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    Ping any admin help review this?Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    Thanks @hvanhovell .IIUC, you mean that adding second order will break the contract of the range partitioner generated by `ShuffleExchange` ?If that, i agree with the opinion。
    And sorry for that i can't figure out  why `select * from tbl_x order by col1` get invalid result?
    May be i missed some other knowledge.Thanks again!



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by gczsjdy <gi...@git.apache.org>.
Github user gczsjdy commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    @caneGuy Can you give a specific example to illustrate your change? Maybe former partition result & later partition result?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19764: [SPARK-22539][SQL] Add second order for rangepart...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19764#discussion_r151346123
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala ---
    @@ -193,14 +193,30 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
     /**
      * A lazily generated row ordering comparator.
      */
    -class LazilyGeneratedOrdering(val ordering: Seq[SortOrder])
    +class LazilyGeneratedOrdering(
    +    val ordering: Seq[SortOrder], secondOrdering: => Seq[SortOrder] = Seq.empty)
       extends Ordering[InternalRow] with KryoSerializable {
     
       def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
         this(ordering.map(BindReferences.bindReference(_, inputSchema)))
     
       @transient
    -  private[this] var generatedOrdering = GenerateOrdering.generate(ordering)
    +  private[this] var generatedOrdering = {
    +    var generatedOrdering = GenerateOrdering.generate(ordering)
    +    if (!secondOrdering.isEmpty) {
    +      secondOrdering.foreach { order =>
    --- End diff --
    
    These two lines are redundant


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19764: [SPARK-22539][SQL] Add second order for rangepart...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19764#discussion_r151346234
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala ---
    @@ -193,14 +193,30 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
     /**
      * A lazily generated row ordering comparator.
      */
    -class LazilyGeneratedOrdering(val ordering: Seq[SortOrder])
    +class LazilyGeneratedOrdering(
    +    val ordering: Seq[SortOrder], secondOrdering: => Seq[SortOrder] = Seq.empty)
       extends Ordering[InternalRow] with KryoSerializable {
     
       def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
         this(ordering.map(BindReferences.bindReference(_, inputSchema)))
     
       @transient
    -  private[this] var generatedOrdering = GenerateOrdering.generate(ordering)
    +  private[this] var generatedOrdering = {
    +    var generatedOrdering = GenerateOrdering.generate(ordering)
    +    if (!secondOrdering.isEmpty) {
    +      secondOrdering.foreach { order =>
    +        try {
    +          GenerateOrdering.generate(Seq(order))
    +          ordering ++ Seq(order)
    --- End diff --
    
    This does not modify a collection and it's return value goes nowhere


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    Ping @srowen can you help review this?Thanks too much


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    Firstly , thanks too much @hvanhovell .
    And sorry for replying so late since i have some other things to handle during these time.
    For the question, i think the ordering will not be broken.I did not modify the 'RangePartitioner' itself.But optimize the partitioner strategy passed from sql.And this will not break the 'Partitioner' contract since the same key will still map to the same partition.
    Notice that key here contains attribute added by second order.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    Ping 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19764: [SPARK-22539][SQL] Add second order for rangepart...

Posted by caneGuy <gi...@git.apache.org>.
Github user caneGuy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19764#discussion_r151375208
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala ---
    @@ -193,14 +193,30 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
     /**
      * A lazily generated row ordering comparator.
      */
    -class LazilyGeneratedOrdering(val ordering: Seq[SortOrder])
    +class LazilyGeneratedOrdering(
    +    val ordering: Seq[SortOrder], secondOrdering: => Seq[SortOrder] = Seq.empty)
       extends Ordering[InternalRow] with KryoSerializable {
     
       def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
         this(ordering.map(BindReferences.bindReference(_, inputSchema)))
     
       @transient
    -  private[this] var generatedOrdering = GenerateOrdering.generate(ordering)
    +  private[this] var generatedOrdering = {
    +    var generatedOrdering = GenerateOrdering.generate(ordering)
    +    if (!secondOrdering.isEmpty) {
    +      secondOrdering.foreach { order =>
    +        try {
    +          GenerateOrdering.generate(Seq(order))
    +          ordering ++ Seq(order)
    --- End diff --
    
    Update @srowen  Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/19764
  
    I'm probably not qualified to review this. I don't think you addressed Herman's question. It wasn't about ordering or whether the same exact row maps to the same partition, but whether all values for a key map to the same partition. I believe that's part of the contract here. If it doesn't do that, then, I don't see how it solves the problem you're trying to solve. Skew is inherently a problem if you promise to put all values for a key together.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org