You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jiangxb1987 <gi...@git.apache.org> on 2018/01/27 00:42:31 UTC

[GitHub] spark pull request #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD ...

GitHub user jiangxb1987 opened a pull request:

    https://github.com/apache/spark/pull/20414

    [SPARK-23243][SQL] Shuffle+Repartition on an RDD could lead to incorrect answers

    ## What changes were proposed in this pull request?
    
    The RDD repartition also uses the round-robin way to distribute data, this can also cause incorrect answers on RDD workload the similar way as in #20393
    
    However, the approach that fixes DataFrame.repartition() doesn't apply on the RDD repartition issue, because the input data can be non-comparable, as discussed in https://github.com/apache/spark/pull/20393#issuecomment-360912451
    
    Here, I propose a quick fix that distribute elements use their hashes, this will cause perf regression if you have highly skewed input data, but it will ensure result correctness. 
    
    ## How was this patch tested?
    
    Added test case in `RDDSuite` to ensure `RDD.repartition()` generate consistent answers.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jiangxb1987/spark rdd-repartition

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20414.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20414
    
----
commit 6910ed62c272bedfa251cab589bb52bed36be3ed
Author: Xingbo Jiang <xi...@...>
Date:   2018-01-27T00:34:24Z

    fix RDD.repartition()

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    @jiangxb1987 You are correct when the sizes of the map's are same.
    But if the map sizes are different, the resulting order can be different - which can happen when requests for additional memory follows different patterns on re-execution (trigger'ing spill).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Thanks @mridulm, all great points!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Hey I searched the `ExternalAppendOnlyMap` and here are the findings:
    The `ExternalAppendOnlyMap` claims it keeps the sorted content, but it actually uses a `HashComparator` that compare the elements by their hashes. Luckily, it sort the elements using TimSort which is stable, that means, even if there exists hash collisions, the output sequence should still be deterministic, as long as the inputs are (which we can achieve by modifying `ShuffleBlockFetcherIterator` per previous discussion).
    
    We may need to check for all the other places we may spill/compare objects to ensure we generate deterministic output sequence everywhere, though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Ouch... Yea, we have to think out a way to make it deterministic under hash collisions.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    @shivaram Thinking more, this might affect everything which does a zip (or variants/similar idioms like limit K, etc) on partition should be affected - with random + index in coalesce + shuffle=true being one special case.
    
    Essentially anything which assumes that order of records in a partition will always be the same - currently,
    * Reading from an external immutable source like hdfs, etc (including checkpoint)
    * Reading from block store
    * Sorted partitions 
    should guarantee this - others need not.
    
    The more I think about it, I like @sameeragarwal's suggestion in #20393, a general solution for this could be introduce deterministic output for shuffle fetch - when enabled takes a more expensive but repeatable iteration of shuffle fetch.
    
    This assumes that spark shuffle is always repeatable given same input (I am yet to look into this in detail when spills are involved - any thoughts @sameeragarwal ?), which could be an implementation detail; but we could make it a requirement for shuffle.
    
    Note that we might be able to avoid this additional cost for most of the current usecases (otherwise we would have faced this problem 2 major releases ago !); so actual user impact, hopefully, might not be as high.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    **[Test build #93558 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93558/testReport)** for PR 20414 at commit [`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    In addition, any use of random in spark code will get affected by this - unless input is an idempotent source; even if random initialization is done predictably with the partition index (which we were doing here anyway).
    We might want to look at mllib and other places as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    **[Test build #86728 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86728/testReport)** for PR 20414 at commit [`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 closed the pull request at:

    https://github.com/apache/spark/pull/20414


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86728/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    @mridulm I also agree we should follow @sameeragarwal 's suggestion to let shuffle fetch produce deterministic output, and only do this for a few operations (e.g. repartition/zipWithIndex, do we have more?) IIUC spill should NOT affect the result, but if you find any suspects, please kindly share them with us. :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    **[Test build #93558 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93558/testReport)** for PR 20414 at commit [`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/304/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    > Not quite - coalesce will not combine partitions across executors (aka shuffle) so you could still end up having many many files.
    
    I'm not sure if I follow here. For `coalesce(1)` Spark just launches a single task to handle all the data partitions. If the final action is saving file, we still have only one file at the end. Compared to `repartition(1)`, I think the only difference is the cost of task retry.
    
    I think we can special case `repartition(1)`, if there is only one reducer, we don't need to add the local sort.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93558/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Hi, @jiangxb1987 . Could you close this PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    @jiangxb1987 Unfortunately I am unable to analyze this in detail; but hopefully can give some pointers, which I hope, helps !
    
    One example I can think of is, for shuffle which uses Aggregator (like combineByKey), via ExternalAppendOnlyMap.
    The order in which we replay the keys with the same hash is non deterministic from what I remember - for example if first run did not result in any spills, second run had 3 spills and third run had 7, the order of keys (with same hash) could be different in each.
    
    Similarly, with sort based shuffle, depending on the length of the data array in AppendOnlyMap (which is determined by whether we spilt or not) we can get different sort order's ?
    Similarly for the actual sort itself, the `merge` quite clearly is sensitive to number of spills (for example when no aggregator or ordering, it is simply `iterators.iterator.flatten`).
    
    There might be other cases where this is happening - I have not regularly looked at this part of the codebase in a while now unfortunately.
    
    Please note that all the cases above, there is no ordering defined.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    @jiangxb1987 @mridulm Could we have a special case of using the sort-based approach when the RDD type is comparable ? I think that should cover a bunch of the common cases and the hash version will only be used when keys are not comparable.
    
    Also @mridulm your point about more things other than repartition being affected is definitely true (just in this file `randomSampleWithRange` I think is affected). I think the only way to solve this in general is to enforce deterministic ordering when constructing ShuffleRDDs ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    **[Test build #86728 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86728/testReport)** for PR 20414 at commit [`6910ed6`](https://github.com/apache/spark/commit/6910ed62c272bedfa251cab589bb52bed36be3ed).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    @felixcheung You are right that I didn't make it clear there should be still many shuffle blocks, and if you have the read task retried it should be slower than using `repartition(1)` directly.
    
    Now I tend to fix the issue following the latter fix-shuffle-fetch-order way, since it may resolve for general cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    > Actually for the first case, you shall use coalesce() instead of repartition() to get a similar effect, without need of another shuffle! 
    Not quite - coalesce will not combine partitions across executor (aka shuffle) so you could still end up having many many files.
    
    I have seen that quite a bit with large scale ML. But FWIW, my comment earlier was for both "regular" use cases and ML use cases.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    @cloud-fan Yea you provide a more clear statement here, and I totally agree!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Just for context, I'm seeing RDD.repartition being used *a lot*.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Talked to @yanboliang offline, he claimed that the major use cases of RDD/DataFrame.repartition() in ml workloads he has observed are:
    1. During save models, you may need `repartition()` to reduce the number of output files, a typical special case is `xxx.repartition(1)`;
    2. You may use `repartition()` to let the original data set to have more partitions, to gain a higher parallelism for following operations.
    
    Actually for the first case, you shall use `coalesce()` instead of `repartition()` to get a similar effect, without need of another shuffle! Also, the scene don't strictly require the data set to distribute evenly, so the change from round-robin partitioning to hash partitioning should be fine.
    For the latter case, if you have a bunch of data with the same values, the change may lead to high data skew and brings performance regression, currently the best-effort-approach we can choose is to perform a local sort if the data type is comparable (and that also requires a lot of work refactoring the `ExternalSorter`).
    
    Another approach is that we may let the `ShuffleBlockFetcherIterator` to remember the sequence of block fetches, and force the blocks to be fetched one-by-one. This actually involves more issues because we may face memory limit and therefore have to spill the fetched blocks. IIUC this should resolve the issue for general cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD could l...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20414
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org