You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by henryr <gi...@git.apache.org> on 2018/08/23 23:26:50 UTC

[GitHub] spark pull request #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-251...

GitHub user henryr opened a pull request:

    https://github.com/apache/spark/pull/22211

    [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.…

    ## What changes were proposed in this pull request?
    
        Back port of #20393 and #22079.
    
        Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.
    
        The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
        upstream stage -> repartition stage -> result stage
        (-> indicate a shuffle)
        When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.
    
        The following code returns 931532, instead of 1000000:
        ```
        import scala.sys.process._
    
        import org.apache.spark.TaskContext
        val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
          x
        }.repartition(200).map { x =>
          if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
            throw new Exception("pkill -f java".!!)
          }
          x
        }
        res.distinct().count()
        ```
    
        In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.
    
        The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.
    
        This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.
    
        Add unit test in ExchangeSuite.
    
        With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
        ```
        import scala.sys.process._
    
        import org.apache.spark.TaskContext
    
        spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")
    
        val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
          x
        }.repartition(200).map { x =>
          if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
            throw new Exception("pkill -f java".!!)
          }
          x
        }
        res.distinct().count()
    
        res7: Long = 1000000
        ```
    
        Author: Xingbo Jiang <xingbo.jiangdatabricks.com>


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/henryr/spark spark-23207-branch-2.1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22211
    
----
commit 5bdd9e353287c2a326138b25f80ee255d15942b0
Author: Xingbo Jiang <xi...@...>
Date:   2018-08-23T21:22:56Z

    [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.1] Shuffle+Repartition on a DataFrame could lead to incorrect answers
    
    ## What changes were proposed in this pull request?
    
    Back port of #20393 and #22079.
    
    Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.
    
    The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
    upstream stage -> repartition stage -> result stage
    (-> indicate a shuffle)
    When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.
    
    The following code returns 931532, instead of 1000000:
    ```
    import scala.sys.process._
    
    import org.apache.spark.TaskContext
    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
      x
    }.repartition(200).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
        throw new Exception("pkill -f java".!!)
      }
      x
    }
    res.distinct().count()
    ```
    
    In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.
    
    The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.
    
    This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.
    
    Add unit test in ExchangeSuite.
    
    With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
    ```
    import scala.sys.process._
    
    import org.apache.spark.TaskContext
    
    spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")
    
    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
      x
    }.repartition(200).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
        throw new Exception("pkill -f java".!!)
      }
      x
    }
    res.distinct().count()
    
    res7: Long = 1000000
    ```
    
    Author: Xingbo Jiang <xingbo.jiangdatabricks.com>
    
    ## How was this patch tested?
    
    Ran all SBT unit tests for org.apache.spark.sql.*.
    
    Ran pyspark tests for module pyspark-sql.
    
    Closes #22079 from bersprockets/SPARK-23207.
    
    Lead-authored-by: Xingbo Jiang <xi...@databricks.com>
    Co-authored-by: Bruce Robbins <be...@gmail.com>
    Co-authored-by: Zheng RuiFeng <ru...@foxmail.com>
    Signed-off-by: Xiao Li <ga...@gmail.com>

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by henryr <gi...@git.apache.org>.
Github user henryr commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Merged to 2.1


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95186/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-251...

Posted by henryr <gi...@git.apache.org>.
Github user henryr closed the pull request at:

    https://github.com/apache/spark/pull/22211


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Hi, @henryr . Since this is merged, could you close this PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2511/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95189/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2514/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    **[Test build #95186 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95186/testReport)** for PR 22211 at commit [`5bdd9e3`](https://github.com/apache/spark/commit/5bdd9e353287c2a326138b25f80ee255d15942b0).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public final class RecordBinaryComparator extends RecordComparator `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    **[Test build #95189 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95189/testReport)** for PR 22211 at commit [`d269015`](https://github.com/apache/spark/commit/d26901516f414d16f8eb38183053d22dc3b056e3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    **[Test build #95186 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95186/testReport)** for PR 22211 at commit [`5bdd9e3`](https://github.com/apache/spark/commit/5bdd9e353287c2a326138b25f80ee255d15942b0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    Thanks! Merged to 2.1


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    cc @jiangxb1987 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22211
  
    **[Test build #95189 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95189/testReport)** for PR 22211 at commit [`d269015`](https://github.com/apache/spark/commit/d26901516f414d16f8eb38183053d22dc3b056e3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org