You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2014/08/16 02:32:34 UTC

[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/1977

    [SPARK-3074] [PySpark] support groupByKey() with single huge key

    This patch change groupByKey() to use external sort based approach, so it can support single huge key.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark groupby

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1977.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1977
    
----
commit 55602ee6122aa5ce5b3f52b66cb0a74a6e275fba
Author: Davies Liu <da...@gmail.com>
Date:   2014-08-15T21:48:34Z

    use external sort in sortBy() and sortByKey()

commit 083d842b2e784674d2ec6823ac4da81140ac48cd
Author: Davies Liu <da...@gmail.com>
Date:   2014-08-16T00:28:06Z

    sorted based groupByKey()

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90288396
  
    **[Test build #29756 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29756/consoleFull)**     for PR 1977 at commit [`c6a2f8d`](https://github.com/apache/spark/commit/c6a2f8d1ad67d9e403391399a78202bdd1ffe561)     after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91376146
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29967/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91069251
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29895/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-68400353
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24904/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52381086
  
    Does / will the same functionality exist in Scala/Java?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56776706
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20781/consoleFull) for   PR 1977 at commit [`1f69f93`](https://github.com/apache/spark/commit/1f69f93b637e45680b71f52246469f379f883e02).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55552290
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/106/consoleFull) for   PR 1977 at commit [`8ef965e`](https://github.com/apache/spark/commit/8ef965e163f68616c7ebb6e65c83314c0c66c3de).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55939676
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20470/consoleFull) for   PR 1977 at commit [`17f4ec6`](https://github.com/apache/spark/commit/17f4ec6ed91c85f6b1c9f8e6882d21600993c566).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55928984
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20470/consoleFull) for   PR 1977 at commit [`17f4ec6`](https://github.com/apache/spark/commit/17f4ec6ed91c85f6b1c9f8e6882d21600993c566).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60718072
  
      [Test build #484 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/484/consoleFull) for   PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
     * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r20132424
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1579,21 +1577,34 @@ def createZero():
     
             return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
     
    +    def _can_spill(self):
    +        return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"
    +
    +    def _memory_limit(self):
    +        return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
    +
         # TODO: support variant with custom partitioner
         def groupByKey(self, numPartitions=None):
             """
             Group the values for each key in the RDD into a single sequence.
    -        Hash-partitions the resulting RDD with into numPartitions partitions.
    +        Hash-partitions the resulting RDD with into numPartitions
    --- End diff --
    
    "with into" should just be "into"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60717966
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55959414
  
    There's a bit of code duplication between ExternalGroupBy and ExternalMerger, but maybe this is unavoidable.  It would be nice to add a short comment to ExternalGroupBy that briefly summarizes the differences.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52859847
  
    cc @mateiz @JoshRosen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91069245
  
      [Test build #29895 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29895/consoleFull) for   PR 1977 at commit [`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlattenedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55517757
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20305/consoleFull) for   PR 1977 at commit [`fbc504a`](https://github.com/apache/spark/commit/fbc504a0e643ebf79de26ccc07e012e2a0603a0c).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52717442
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18897/consoleFull) for   PR 1977 at commit [`b2dc3bf`](https://github.com/apache/spark/commit/b2dc3bf1885c11ef3b83083c2f4bca048a9e352f).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91133293
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29925/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52376616
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18642/consoleFull) for   PR 1977 at commit [`083d842`](https://github.com/apache/spark/commit/083d842b2e784674d2ec6823ac4da81140ac48cd).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90356172
  
      [Test build #29779 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29779/consoleFull) for   PR 1977 at commit [`d2f053b`](https://github.com/apache/spark/commit/d2f053b3eb39ff0c42a3e5df27f0ed4accc1c1d5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52384600
  
    @sryza There are similar things in Scala, but we can not compare the Python object in Scala, so it can not use the groupByKey() in Scala directly. All the aggregation should be implemented in Python also.
    
    @andrewor14,  I hope PySpark could catch up with Scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52393443
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18674/consoleFull) for   PR 1977 at commit [`b40bae7`](https://github.com/apache/spark/commit/b40bae770c850e9a8a5e742af850100a659c7949).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91108456
  
    @JoshRosen Thanks for the comments, it looks better now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55978058
  
    Also, can you see what happens when you do rdd.groupByKey().cache()? Can we serialize and deserialize these objects back to Scala-land? It's okay if we can't cache overly large groups for now, but we should just make sure this works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52386913
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18667/consoleFull) for   PR 1977 at commit [`efa23df`](https://github.com/apache/spark/commit/efa23df9a82c02169bd526e7f99d30c7f6a95de2).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `abstract class Serializer `
      * `abstract class SerializerInstance `
      * `abstract class SerializationStream `
      * `abstract class DeserializationStream `
      * `class ShuffleBlockManager(blockManager: BlockManager,`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalSorter(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90717977
  
      [Test build #637 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/637/consoleFull) for   PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90255941
  
      [Test build #29756 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29756/consoleFull) for   PR 1977 at commit [`c6a2f8d`](https://github.com/apache/spark/commit/c6a2f8d1ad67d9e403391399a78202bdd1ffe561).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90697704
  
      [Test build #29802 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29802/consoleFull) for   PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28013345
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.disk_count = 0
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            bytes = f.read()
    +        else:
    +            bytes = ''
    +        return self.values, self.disk_count, bytes
    +
    +    def __setstate__(self, item):
    +        self.values, self.disk_count, bytes = item
    +        if bytes:
    +            self._open_file()
    +            self._file.write(bytes)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.disk_count + len(self.values)
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    --- End diff --
    
    Ah, I see.  From the [unlink](http://pubs.opengroup.org/onlinepubs/009695399/functions/unlink.html) spec:
    
    > When the file's link count becomes 0 and no process has the file open, the space occupied by the file shall be freed and the file shall no longer be accessible. If one or more processes have the file open when the last link is removed, the link shall be removed before unlink() returns, but the removal of the file contents shall be postponed until all references to the file are closed.
    
    This is a very clever trick!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55982725
  
    @mateiz  In this patch, the values in SameKey can only be iterated once, I will fix this later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by airhorns <gi...@git.apache.org>.
Github user airhorns commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-78116054
  
    Hey @davies this would be freakin' fantastic to get merged... any chance that might happen soon? We hit many issues with skewed group sizes causing a whole job to fail and in my brief experimentation I think this would make things far more stable for us. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by mkhaitman <gi...@git.apache.org>.
Github user mkhaitman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r27223759
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -244,72 +258,57 @@ def _next_limit(self):
     
         def mergeValues(self, iterator):
             """ Combine the items by creator and combiner """
    -        iterator = iter(iterator)
             # speedup attribute lookup
             creator, comb = self.agg.createCombiner, self.agg.mergeValue
    -        d, c, batch = self.data, 0, self.batch
    +        c, data, pdata, hfun, batch = 0, self.data, self.pdata, self._partition, self.batch
    +        limit = self.memory_limit
     
             for k, v in iterator:
    +            d = pdata[hfun(k)] if pdata else data
                 d[k] = comb(d[k], v) if k in d else creator(v)
     
                 c += 1
    -            if c % batch == 0 and get_used_memory() > self.memory_limit:
    -                self._spill()
    -                self._partitioned_mergeValues(iterator, self._next_limit())
    -                break
    +            if c >= batch:
    +                if get_used_memory() >= limit:
    +                    self._spill()
    +                    limit = self._next_limit()
    +                    batch /= 2
    +                    c = 0
    +                else:
    +                    batch *= 1.5
     
    --- End diff --
    
    Would it make sense to do a final memory check after the for loop, in case we just added another 999 items (worst case) without spilling any of it to disk? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55958078
  
    Summarizing some of our in-person discussion (@davies, let me know if I've made any mistakes here!):
    
    `GroupByKey` and `SameKey` work together to address the different ways that users might consume the result of the group-by.  The design goal here is laziness: don't load all of a group's values into a list unless the user calls something like `list()` on the values iterator.
    
    If a user does something like `mapValues(len)`, the first call to `GroupByKey.next()` will return a `(key, SameKey)` object.  If we then attempt to iterate over the values, `SameKey.next()` will read values from the group until it hits a value that belongs to a different group.  In this case, `SameKey.next()` will place the value into `GroupByKey. next_item` so that it's read on the next call to `GroupByKey.next()`.
    
    We also support cases consumes the entire `GroupByKey` iterator and then reads individual groups (e.g. call `glom()` on the results of the `groupByKey()`).  In these cases, we unroll the group values into a list (stored inside of `SameKey`).  If this unrolling would cause an OOM, `SameKey` spills to disk and reads the values back once the user consumes the values.  This feature helps to prevent OOMs when users write things like `x.groupByKey().count()` to count the number of distinct keys (there are _much_ more efficient ways of doing this, but at least now we won't fail if someone tries to do things the bad way).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by mkhaitman <gi...@git.apache.org>.
Github user mkhaitman commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-86688264
  
    @davies Sorry, I deleted the comment though you still received the notification. Think it was just a fluke since it didn't happen the second time. Sorry about that! So far looking good on joins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-63164145
  
    I agree that this is a good fix; I've been letting the review slip because this PR is pretty complex and it will take me a decent amount of time to be sure that it's correct in all cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91060947
  
    @JoshRosen the last comments had been addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by mkhaitman <gi...@git.apache.org>.
Github user mkhaitman commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-86236377
  
    This PR looks amazing! I'm going to test this out tomorrow with 1.3-rc3 and report back with some findings. I started taking a stab initially at trying to improve the shuffle memory handling a bit but it looks like I don't have to now!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91052717
  
      [Test build #29895 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29895/consoleFull) for   PR 1977 at commit [`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-58259774
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21397/consoleFull) for   PR 1977 at commit [`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91392044
  
    Great thanks to test it, it did help us to find a bug!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91050249
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29891/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55554117
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/106/consoleFull) for   PR 1977 at commit [`8ef965e`](https://github.com/apache/spark/commit/8ef965e163f68616c7ebb6e65c83314c0c66c3de).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-78118374
  
     Ping @JoshRosen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28032910
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += 1
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    +        self.values = []
    +        gc.collect()
    +        DiskBytesSpilled += self._file.tell() - pos
    +        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
    +
    +
    +class ExternalListOfList(ExternalList):
    +    """
    +    An external list for list.
    +    """
    +    def __init__(self, values):
    +        ExternalList.__init__(self, values)
    +        self.count = sum(len(i) for i in values)
    +
    +    def append(self, value):
    +        ExternalList.append(self, value)
    +        # already counted 1 in ExternalList.append
    +        self.count += len(value) - 1
    +
    +
    +class GroupByKey(object):
    --- End diff --
    
    Yes, I'd want to the code and test of ExternalList and GroupByKey could be easy to understand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60722431
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22348/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52864579
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19014/consoleFull) for   PR 1977 at commit [`acd8e1b`](https://github.com/apache/spark/commit/acd8e1b8f920711ea57fc2bb75c1749c077b372e).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own`
      * `case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)`
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalSorter(object):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-62460926
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23156/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r27998013
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1755,21 +1753,33 @@ def createZero():
     
             return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
     
    +    def _can_spill(self):
    +        return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"
    +
    +    def _memory_limit(self):
    +        return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
    +
         # TODO: support variant with custom partitioner
         def groupByKey(self, numPartitions=None):
             """
             Group the values for each key in the RDD into a single sequence.
    -        Hash-partitions the resulting RDD with into numPartitions partitions.
    +        Hash-partitions the resulting RDD with numPartitions partitions.
    +
    +        The values in the resulting RDD is iterable object L{ResultIterable},
    +        they can be iterated only once. The `len(values)` will result in
    --- End diff --
    
    Is this still true?  [Upthread](https://github.com/apache/spark/pull/1977#issuecomment-56004283), you mentioned that this can now be iterated multiple times, so we should probably update this comment.  It would also be nice to add a unit test to illustrate that the calling `len(values)` then reading the values will work, even if it's inefficient (you might have already done this in tests, though; I haven't checked).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56773667
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20779/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r27994956
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.disk_count = 0
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            bytes = f.read()
    --- End diff --
    
    I think `bytes` is a reserved word in Python, so maybe we should call this `_bytes` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r27995826
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.disk_count = 0
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            bytes = f.read()
    +        else:
    +            bytes = ''
    +        return self.values, self.disk_count, bytes
    +
    +    def __setstate__(self, item):
    +        self.values, self.disk_count, bytes = item
    +        if bytes:
    +            self._open_file()
    +            self._file.write(bytes)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.disk_count + len(self.values)
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    --- End diff --
    
    Why do we need this `unlink` call here?  Is this to delete the file if it already exists?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28001568
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -220,6 +220,29 @@ def __repr__(self):
             return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
     
     
    +class FlattedValuesSerializer(BatchedSerializer):
    --- End diff --
    
    Can we rename this to `FlattenedValuesSerializer` (with the missing `en`)?  Should be a pretty simple find-and-replace, but we should also update the other occurrences of `flatted` to be `flattened`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90766727
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29816/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90766722
  
      [Test build #29816 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29816/consoleFull) for   PR 1977 at commit [`e3b8eab`](https://github.com/apache/spark/commit/e3b8eab0677d39eed0476f5cf1a9f3e4ad963bdc).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91036610
  
      [Test build #29891 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29891/consoleFull) for   PR 1977 at commit [`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28019740
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,301 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        if values and isinstance(values[0], list):
    +            self.count = sum(len(i) for i in values)
    +        else:
    +            self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += len(value) if isinstance(value, list) else 1
    --- End diff --
    
    Why do we add the length of `value` if it's a list?  This would make sense for `extend()`, but I find it a bit confusing for `append()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91363332
  
      [Test build #29967 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29967/consoleFull) for   PR 1977 at commit [`af3713a`](https://github.com/apache/spark/commit/af3713a1ee73cc0b4fe43e61dd82eab7924aa379).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-68407892
  
      [Test build #557 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/557/consoleFull) for   PR 1977 at commit [`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60472642
  
      [Test build #22199 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22199/consoleFull) for   PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-58250188
  
    I had simplify GroupByKey, it's much readable now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55560776
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20328/consoleFull) for   PR 1977 at commit [`4d4bc86`](https://github.com/apache/spark/commit/4d4bc8671a4ef7e9d2d9924681bed1f8e4695a20).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-68400348
  
    **[Test build #24904 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24904/consoleFull)**     for PR 1977 at commit [`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f)     after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52377774
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18646/consoleFull) for   PR 1977 at commit [`d05060d`](https://github.com/apache/spark/commit/d05060db533858f31bc4b3ff7a32c24412499187).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-61682571
  
      [Test build #22878 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22878/consoleFull) for   PR 1977 at commit [`a14b4bd`](https://github.com/apache/spark/commit/a14b4bdd1cb72f41a5449326888d600cbe183062).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56780803
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20781/consoleFull) for   PR 1977 at commit [`1f69f93`](https://github.com/apache/spark/commit/1f69f93b637e45680b71f52246469f379f883e02).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class ChainedIterable(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91108728
  
      [Test build #29921 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29921/consoleFull) for   PR 1977 at commit [`e78c15c`](https://github.com/apache/spark/commit/e78c15c41ff0c1cf393bb6e65eb513ebd8357c75).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28027636
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += 1
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    +        self.values = []
    +        gc.collect()
    +        DiskBytesSpilled += self._file.tell() - pos
    +        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
    +
    +
    +class ExternalListOfList(ExternalList):
    +    """
    +    An external list for list.
    +    """
    +    def __init__(self, values):
    +        ExternalList.__init__(self, values)
    +        self.count = sum(len(i) for i in values)
    +
    +    def append(self, value):
    +        ExternalList.append(self, value)
    +        # already counted 1 in ExternalList.append
    +        self.count += len(value) - 1
    +
    +
    +class GroupByKey(object):
    --- End diff --
    
    It looks like we only directly use `GroupByKey` in tests, while the actual shuffle code only uses `GroupListsByKey`.  Is this intentional?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28032966
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += 1
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    +        self.values = []
    +        gc.collect()
    +        DiskBytesSpilled += self._file.tell() - pos
    +        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
    +
    +
    +class ExternalListOfList(ExternalList):
    +    """
    +    An external list for list.
    +    """
    +    def __init__(self, values):
    +        ExternalList.__init__(self, values)
    +        self.count = sum(len(i) for i in values)
    +
    +    def append(self, value):
    +        ExternalList.append(self, value)
    --- End diff --
    
    I can change the __iter__ of ExternalListOfList, then we do not need ChainedIterator anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90740407
  
      [Test build #637 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/637/consoleFull) for   PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch **adds the following new dependencies:**
       * `activation-1.1.jar`
       * `aopalliance-1.0.jar`
       * `avro-1.7.7.jar`
       * `breeze-macros_2.10-0.11.2.jar`
       * `breeze_2.10-0.11.2.jar`
       * `commons-cli-1.2.jar`
       * `commons-compress-1.4.1.jar`
       * `commons-io-2.1.jar`
       * `commons-lang-2.5.jar`
       * `gmbal-api-only-3.0.0-b023.jar`
       * `grizzly-framework-2.1.2.jar`
       * `grizzly-http-2.1.2.jar`
       * `grizzly-http-server-2.1.2.jar`
       * `grizzly-http-servlet-2.1.2.jar`
       * `grizzly-rcm-2.1.2.jar`
       * `guice-3.0.jar`
       * `hadoop-annotations-2.2.0.jar`
       * `hadoop-auth-2.2.0.jar`
       * `hadoop-client-2.2.0.jar`
       * `hadoop-common-2.2.0.jar`
       * `hadoop-hdfs-2.2.0.jar`
       * `hadoop-mapreduce-client-app-2.2.0.jar`
       * `hadoop-mapreduce-client-common-2.2.0.jar`
       * `hadoop-mapreduce-client-core-2.2.0.jar`
       * `hadoop-mapreduce-client-jobclient-2.2.0.jar`
       * `hadoop-mapreduce-client-shuffle-2.2.0.jar`
       * `hadoop-yarn-api-2.2.0.jar`
       * `hadoop-yarn-client-2.2.0.jar`
       * `hadoop-yarn-common-2.2.0.jar`
       * `hadoop-yarn-server-common-2.2.0.jar`
       * `jackson-jaxrs-1.8.8.jar`
       * `jackson-xc-1.8.8.jar`
       * `javax.inject-1.jar`
       * `javax.servlet-3.1.jar`
       * `javax.servlet-api-3.0.1.jar`
       * `jaxb-api-2.2.2.jar`
       * `jaxb-impl-2.2.3-1.jar`
       * `jersey-client-1.9.jar`
       * `jersey-core-1.9.jar`
       * `jersey-grizzly2-1.9.jar`
       * `jersey-guice-1.9.jar`
       * `jersey-json-1.9.jar`
       * `jersey-server-1.9.jar`
       * `jersey-test-framework-core-1.9.jar`
       * `jersey-test-framework-grizzly2-1.9.jar`
       * `jettison-1.1.jar`
       * `jetty-util-6.1.26.jar`
       * `management-api-3.0.0-b012.jar`
       * `protobuf-java-2.4.1.jar`
       * `spark-bagel_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-catalyst_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-core_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-graphx_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-launcher_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-mllib_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-network-common_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-network-shuffle_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-repl_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-sql_2.10-1.4.0-SNAPSHOT.jar`
       * `spark-streaming_2.10-1.4.0-SNAPSHOT.jar`
       * `stax-api-1.0.1.jar`
       * `xz-1.0.jar`
    
     * This patch **removes the following dependencies:**
       * `breeze-macros_2.10-0.11.1.jar`
       * `breeze_2.10-0.11.1.jar`
       * `commons-el-1.0.jar`
       * `commons-io-2.4.jar`
       * `commons-lang-2.4.jar`
       * `hadoop-client-1.0.4.jar`
       * `hadoop-core-1.0.4.jar`
       * `hsqldb-1.8.0.10.jar`
       * `jblas-1.2.3.jar`
       * `spark-bagel_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-catalyst_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-core_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-graphx_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-launcher_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-mllib_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-network-common_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-network-shuffle_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-repl_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-sql_2.10-1.3.0-SNAPSHOT.jar`
       * `spark-streaming_2.10-1.3.0-SNAPSHOT.jar`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56009374
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20530/consoleFull) for   PR 1977 at commit [`47918b8`](https://github.com/apache/spark/commit/47918b83b21674347cbd742d40e13ee36e9e58ee).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class ChainedIterable(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1977


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28010925
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -367,32 +372,13 @@ def iteritems(self):
     
         def _external_items(self):
             """ Return all partitioned items as iterator """
    -        assert not self.data
    --- End diff --
    
    The assumption is still true, I can add that back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56770877
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20779/consoleFull) for   PR 1977 at commit [`0d3395f`](https://github.com/apache/spark/commit/0d3395f32f587403714ac58c6d4f914ba235ceea).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-58115490
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21348/consoleFull) for   PR 1977 at commit [`1578f2e`](https://github.com/apache/spark/commit/1578f2e722b99732d9455186bcf5c085670fb8a3).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class ChainedIterable(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`
      * `case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], isLazy: Boolean)`
      * `case class UncacheTableCommand(tableName: String) extends Command`
      * `case class CacheTableCommand(`
      * `case class UncacheTableCommand(tableName: String) extends LeafNode with Command `
      * `case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55556598
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20328/consoleFull) for   PR 1977 at commit [`4d4bc86`](https://github.com/apache/spark/commit/4d4bc8671a4ef7e9d2d9924681bed1f8e4695a20).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91050238
  
      [Test build #29891 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29891/consoleFull) for   PR 1977 at commit [`0dcf320`](https://github.com/apache/spark/commit/0dcf3208220a46cead751aacee7e7e3e22a5132b).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlattenedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55984553
  
    We can't merge this patch until that's done then, because that would be a regression. In general we try to keep even master free of regressions because quite a few people run it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56780634
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/151/consoleFull) for   PR 1977 at commit [`0d3395f`](https://github.com/apache/spark/commit/0d3395f32f587403714ac58c6d4f914ba235ceea).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class ChainedIterable(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90371821
  
      [Test build #29779 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29779/consoleFull) for   PR 1977 at commit [`d2f053b`](https://github.com/apache/spark/commit/d2f053b3eb39ff0c42a3e5df27f0ed4accc1c1d5).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-61345063
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22651/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56004283
  
    @JoshRosen  @mateiz I had addressed all your comments. The IResulterIterator can be iterated multiple times now, also can be pickled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28033500
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += 1
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    +        self.values = []
    +        gc.collect()
    +        DiskBytesSpilled += self._file.tell() - pos
    +        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
    +
    +
    +class ExternalListOfList(ExternalList):
    +    """
    +    An external list for list.
    +    """
    +    def __init__(self, values):
    +        ExternalList.__init__(self, values)
    +        self.count = sum(len(i) for i in values)
    +
    +    def append(self, value):
    +        ExternalList.append(self, value)
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-61665751
  
      [Test build #22878 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22878/consoleFull) for   PR 1977 at commit [`a14b4bd`](https://github.com/apache/spark/commit/a14b4bdd1cb72f41a5449326888d600cbe183062).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52717927
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18899/consoleFull) for   PR 1977 at commit [`f157fe7`](https://github.com/apache/spark/commit/f157fe7954502b72c420e0b96201b3d72a5365cd).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalSorter(object):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52717915
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18899/consoleFull) for   PR 1977 at commit [`f157fe7`](https://github.com/apache/spark/commit/f157fe7954502b72c420e0b96201b3d72a5365cd).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90288434
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29756/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91133286
  
      [Test build #29925 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29925/consoleFull) for   PR 1977 at commit [`67772dd`](https://github.com/apache/spark/commit/67772dd9e5fc2157f1da960aebfa82b133615527).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlattenedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class ExternalListOfList(ExternalList):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-53515747
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19262/consoleFull) for   PR 1977 at commit [`b48cda5`](https://github.com/apache/spark/commit/b48cda52ed564bfadd7edc307ccae179ed6dd429).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28027737
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += 1
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    +        self.values = []
    +        gc.collect()
    +        DiskBytesSpilled += self._file.tell() - pos
    +        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
    +
    +
    +class ExternalListOfList(ExternalList):
    +    """
    +    An external list for list.
    +    """
    +    def __init__(self, values):
    +        ExternalList.__init__(self, values)
    +        self.count = sum(len(i) for i in values)
    +
    +    def append(self, value):
    +        ExternalList.append(self, value)
    --- End diff --
    
    I'm still a bit confused by the contract for this class.  If `count = N` and __len__ is defined as `self.count`, then I'd expect that our iterator would yield exactly `N` items.  It seems like this contract is either violated by this class or its superclass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52599265
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18823/consoleFull) for   PR 1977 at commit [`11ba318`](https://github.com/apache/spark/commit/11ba318a39cda531894f85b318415d74dfd1b082).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55938918
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/125/consoleFull) for   PR 1977 at commit [`4d4bc86`](https://github.com/apache/spark/commit/4d4bc8671a4ef7e9d2d9924681bed1f8e4695a20).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52383067
  
    I believe this is one of those few things in Spark where python is ahead of Scala


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-62447893
  
    @JoshRosen @mateiz Could we make this into 1.2 ? it had sit here for 2 months.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP] [SPARK-3074] [PySpark] support groupByKe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52594495
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18810/consoleFull) for   PR 1977 at commit [`1ea0669`](https://github.com/apache/spark/commit/1ea0669a72ce11e29e78da747f3d3e1d2f28df8a).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52859823
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52721511
  
    Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90670585
  
      [Test build #29802 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29802/consoleFull) for   PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90371839
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29779/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90293365
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29757/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-58248987
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21397/consoleFull) for   PR 1977 at commit [`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91363104
  
    @JoshRosen Good catch! fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP] [SPARK-3074] [PySpark] support groupByKe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52591155
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18810/consoleFull) for   PR 1977 at commit [`1ea0669`](https://github.com/apache/spark/commit/1ea0669a72ce11e29e78da747f3d3e1d2f28df8a).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55958996
  
    This looks like a good patch.  The code here is fairly complicated and had some complex control flow, although after discussion I believe that it works correctly.  It would be great if you could add some more comments explaining, say, which parts of the code are only called once, which branches are taken under which scenarios, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90293354
  
    **[Test build #29757 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29757/consoleFull)**     for PR 1977 at commit [`d9589ab`](https://github.com/apache/spark/commit/d9589ab0dbcd6aa443906c3b1ec432d6c68f0587)     after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90258222
  
      [Test build #29757 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29757/consoleFull) for   PR 1977 at commit [`d9589ab`](https://github.com/apache/spark/commit/d9589ab0dbcd6aa443906c3b1ec432d6c68f0587).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56773664
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20779/consoleFull) for   PR 1977 at commit [`0d3395f`](https://github.com/apache/spark/commit/0d3395f32f587403714ac58c6d4f914ba235ceea).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class ChainedIterable(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52717422
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18897/consoleFull) for   PR 1977 at commit [`b2dc3bf`](https://github.com/apache/spark/commit/b2dc3bf1885c11ef3b83083c2f4bca048a9e352f).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52379142
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18646/consoleFull) for   PR 1977 at commit [`d05060d`](https://github.com/apache/spark/commit/d05060db533858f31bc4b3ff7a32c24412499187).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ResultIterable(object):`
      * `class ExternalSorter(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52828711
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18970/consoleFull) for   PR 1977 at commit [`0a081c6`](https://github.com/apache/spark/commit/0a081c6009df3b4bec48363d30e1402dd5ff11a6).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalSorter(object):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r17678700
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1562,21 +1560,34 @@ def createZero():
     
             return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
     
    +    def _can_spill(self):
    +        return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"
    +
    +    def _memory_limit(self):
    +        return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
    +
         # TODO: support variant with custom partitioner
         def groupByKey(self, numPartitions=None):
             """
             Group the values for each key in the RDD into a single sequence.
    -        Hash-partitions the resulting RDD with into numPartitions partitions.
    +        Hash-partitions the resulting RDD with into numPartitions
    +        partitions.
    +
    +        The values in the resulting RDD is iterable object L{ResultIterable},
    +        they can be iterated only once. The `len(values)` will result in
    --- End diff --
    
    This is a change from the old behavior.  Based on our discussion, I guess that we only return a ResultIterable in cases where we spill and still return a list in the non-spilling cases? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91347431
  
    I spent a bit of time fuzz-testing this code to try to reach 100% coverage of the changes in this patch.  While doing so, I think I uncovered a bug:
    
    ```
    ../Spark/python/pyspark/shuffle.py:383: in _external_items
        for v in self._merged_items(i):
    ../Spark/python/pyspark/shuffle.py:826: in <genexpr>
        return ((k, vs) for k, vs in GroupByKey(sorted_items))
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pyspark.shuffle.GroupByKey object at 0x1048d0990>
    
        def next(self):
    >       key, value = self.next_item if self.next_item else next(self.iterator)
    E       TypeError: list object is not an iterator
    
    ../Spark/python/pyspark/shuffle.py:669: TypeError
    ```
    
    It looks like the `GroupByKey` object expects to be instantiated with an iterator, but in `GroupBy. _merge_sorted_items` we end up calling it with the output of `ExternalSorter.sorted`.  It looks like there's a branch in `ExternalSorter.sorted` where we can end up returning a list instead of an iterator (line 517), where we return `current_chunk`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60473742
  
      [Test build #22199 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22199/consoleFull) for   PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-53848815
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19461/consoleFull) for   PR 1977 at commit [`779ed03`](https://github.com/apache/spark/commit/779ed038e767c2a145a6e5e7a9cee3a01d52a2de).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91081544
  
    Sorry for my initial confusion regarding the external lists of lists.  I think that the `__len__` thing might be an issue if we ever directly expose `ExternalListOfList` to users, but it looks like we currently only expose it through a `ChainedIterable` in this code, so it doesn't appear to be a problem yet.  This still might be worth addressing if you agree that it could help prevent future bugs if we start using this in more places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52599524
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18815/consoleFull) for   PR 1977 at commit [`085aef8`](https://github.com/apache/spark/commit/085aef84f7099e9d9a4a6d30653fdd4a1408ed11).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalSorter(object):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52820243
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18970/consoleFull) for   PR 1977 at commit [`0a081c6`](https://github.com/apache/spark/commit/0a081c6009df3b4bec48363d30e1402dd5ff11a6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by mkhaitman <gi...@git.apache.org>.
Github user mkhaitman commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-86640421
  
    Tried this out with 1.3-rc3 and was getting FetchFailedExceptions while performing a join between two RDDs:
    
    org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/tmp/spark-6b363150-d169-4f3e-b703-98048b3dca7f/spark-edaee223-72e4-47f5-a984-64d25bdc34bd/spark-97f52fa9-3fc6-49f4-b1da-660c08e6d826/blockmgr-18bb9336-ca07-43db-ae49-c4be84846eb3/2c/shuffle_2_11_0.data, offset=1193, length=230}
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90697722
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29802/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91077067
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29900/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28027829
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += 1
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    --- End diff --
    
    If `self.values` is a collection, why do we need to wrap it into a list prior to dumping it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60474811
  
    **[Test build #431 timed out](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/431/consoleFull)**     for PR 1977 at commit [`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba)     after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60725730
  
      [Test build #484 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/484/consoleFull) for   PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
     * This patch **passes all tests**.
     * This patch **does not merge cleanly**.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60473744
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22199/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-86686865
  
    Do you hit this error without the patch? I have no idea on why they are related.
    
    On Thursday, March 26, 2015 at 10:43 AM, mkhaitman wrote:
    
    > Tried this out with 1.3-rc3 and was getting FetchFailedExceptions while performing a join between two RDDs:
    > org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/tmp/spark-6b363150-d169-4f3e-b703-98048b3dca7f/spark-edaee223-72e4-47f5-a984-64d25bdc34bd/spark-97f52fa9-3fc6-49f4-b1da-660c08e6d826/blockmgr-18bb9336-ca07-43db-ae49-c4be84846eb3/2c/shuffle_2_11_0.data, offset=1193, length=230}
    >  
    > —
    > Reply to this email directly or view it on GitHub (https://github.com/apache/spark/pull/1977#issuecomment-86640421).
    >  
    >  
    >  
    >  
    >  
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91115081
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29921/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52712146
  
    cc @JoshRosen @mateiz 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52725175
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18908/consoleFull) for   PR 1977 at commit [`f157fe7`](https://github.com/apache/spark/commit/f157fe7954502b72c420e0b96201b3d72a5365cd).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90750781
  
      [Test build #29816 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29816/consoleFull) for   PR 1977 at commit [`e3b8eab`](https://github.com/apache/spark/commit/e3b8eab0677d39eed0476f5cf1a9f3e4ad963bdc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91062845
  
      [Test build #29900 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29900/consoleFull) for   PR 1977 at commit [`0b0fde8`](https://github.com/apache/spark/commit/0b0fde8b568141b03d7ba8b7d74e5b885dcb4557).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-58109293
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21348/consoleFull) for   PR 1977 at commit [`1578f2e`](https://github.com/apache/spark/commit/1578f2e722b99732d9455186bcf5c085670fb8a3).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28009944
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +520,295 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.disk_count = 0
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            bytes = f.read()
    +        else:
    +            bytes = ''
    +        return self.values, self.disk_count, bytes
    +
    +    def __setstate__(self, item):
    +        self.values, self.disk_count, bytes = item
    +        if bytes:
    +            self._open_file()
    +            self._file.write(bytes)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.disk_count + len(self.values)
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    --- End diff --
    
    We want to remove the file once it's closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91038005
  
    @JoshRosen Also, I had rollback the changes in ResultIterable (because some one is using ResultIterable.maxindex), and improve the performance of len(ResultIterable) for ExternalList, please take another look. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91376129
  
      [Test build #29967 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29967/consoleFull) for   PR 1977 at commit [`af3713a`](https://github.com/apache/spark/commit/af3713a1ee73cc0b4fe43e61dd82eab7924aa379).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlattenedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class ExternalListOfList(ExternalList):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-58259787
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21397/Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-62460909
  
      [Test build #23156 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23156/consoleFull) for   PR 1977 at commit [`70aadcd`](https://github.com/apache/spark/commit/70aadcdba2952c38e3ce531ffd25761b553c1369).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public final class JavaRecoverableNetworkWordCount `
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56780807
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20781/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-61682581
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22878/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28027864
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    --- End diff --
    
    I understand now that this is just unwrapping the list that we wrapped around `self.values` in the `_spill` call down on line 615, but this was a bit confusing to me.  If we need to keep this and the wrapping, could you add comments to explain it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52393277
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-68400955
  
      [Test build #557 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/557/consoleFull) for   PR 1977 at commit [`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28028089
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,322 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += 1
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    --- End diff --
    
    Aha, I see that this because we don't use a batching serializer.  This makes sense now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-53522590
  
    **Tests timed out** after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91077061
  
      [Test build #29900 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29900/consoleFull) for   PR 1977 at commit [`0b0fde8`](https://github.com/apache/spark/commit/0b0fde8b568141b03d7ba8b7d74e5b885dcb4557).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlattenedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class ExternalListOfList(ExternalList):`
      * `class GroupByKey(object):`
      * `class GroupListsByKey(GroupByKey):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-61338247
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52860133
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19014/consoleFull) for   PR 1977 at commit [`acd8e1b`](https://github.com/apache/spark/commit/acd8e1b8f920711ea57fc2bb75c1749c077b372e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-68388416
  
      [Test build #24904 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24904/consoleFull) for   PR 1977 at commit [`2b9c261`](https://github.com/apache/spark/commit/2b9c261497fab4bb8d310d30e0017aa3b3d2f37f).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-58115496
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21348/Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP] [SPARK-3074] [PySpark] support groupByKe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52595505
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18815/consoleFull) for   PR 1977 at commit [`085aef8`](https://github.com/apache/spark/commit/085aef84f7099e9d9a4a6d30653fdd4a1408ed11).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52394905
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18674/consoleFull) for   PR 1977 at commit [`b40bae7`](https://github.com/apache/spark/commit/b40bae770c850e9a8a5e742af850100a659c7949).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalSorter(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91115066
  
      [Test build #29921 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29921/consoleFull) for   PR 1977 at commit [`e78c15c`](https://github.com/apache/spark/commit/e78c15c41ff0c1cf393bb6e65eb513ebd8357c75).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FlattenedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class ExternalListOfList(ExternalList):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56002944
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20530/consoleFull) for   PR 1977 at commit [`47918b8`](https://github.com/apache/spark/commit/47918b83b21674347cbd742d40e13ee36e9e58ee).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55519113
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20305/consoleFull) for   PR 1977 at commit [`fbc504a`](https://github.com/apache/spark/commit/fbc504a0e643ebf79de26ccc07e012e2a0603a0c).
     * This patch **passes** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90718319
  
      [Test build #638 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/638/consoleFull) for   PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91011683
  
    I spent most of the morning looking this over again and the patch looks pretty good to me.  I think I understand the lifecycle of values pretty well.  I left a couple of questions / comments upthread, but overall I think this is pretty close to being ready to merge.
    
    One quick question: when do groupBy's spill files get cleaned up?  It seems like we want them to be cleaned at the end of the task, since at that point we know for sure that they won't be re-used.  To handle this case, what do you think about adding a cleanup hook mechanism that allows us to register cleanup code to run at the end of a task?  In the past, we could have relied on shutdown hooks for this, but that's no longer possible due to worker re-use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91051963
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-53648492
  
    @mateiz @JoshRosen I think this PR is ready for review, it helped user to do groupByKey() over 120G dataset with the hottest key which has more than 80 millions values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-53844406
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19461/consoleFull) for   PR 1977 at commit [`779ed03`](https://github.com/apache/spark/commit/779ed038e767c2a145a6e5e7a9cee3a01d52a2de).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-53844028
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-90737032
  
      [Test build #638 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/638/consoleFull) for   PR 1977 at commit [`2a1857a`](https://github.com/apache/spark/commit/2a1857a4c1d72bee5a93ffbe1da013ffc7ff673b).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-61338701
  
      [Test build #22651 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22651/consoleFull) for   PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-62448001
  
      [Test build #23156 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23156/consoleFull) for   PR 1977 at commit [`70aadcd`](https://github.com/apache/spark/commit/70aadcdba2952c38e3ce531ffd25761b553c1369).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91037582
  
    @JoshRosen Thanks for review this, this may be the most complicated part in PySpark. :(
    
    For partitioned file, they will be cleaned up after merging, partition by partition. The sorted file and file for large list (ExternalList) will be cleaned up once it's closed by GC (os.unlink() after open it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52386192
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18667/consoleFull) for   PR 1977 at commit [`efa23df`](https://github.com/apache/spark/commit/efa23df9a82c02169bd526e7f99d30c7f6a95de2).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-61345056
  
      [Test build #22651 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22651/consoleFull) for   PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52865158
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19015/consoleFull) for   PR 1977 at commit [`85138e6`](https://github.com/apache/spark/commit/85138e6debc7bccd75578c2ffdf030c3d483ce4e).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)`
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalSorter(object):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55928306
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/125/consoleFull) for   PR 1977 at commit [`4d4bc86`](https://github.com/apache/spark/commit/4d4bc8671a4ef7e9d2d9924681bed1f8e4695a20).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r17680613
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1562,21 +1560,34 @@ def createZero():
     
             return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
     
    +    def _can_spill(self):
    +        return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"
    +
    +    def _memory_limit(self):
    +        return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
    +
         # TODO: support variant with custom partitioner
         def groupByKey(self, numPartitions=None):
             """
             Group the values for each key in the RDD into a single sequence.
    -        Hash-partitions the resulting RDD with into numPartitions partitions.
    +        Hash-partitions the resulting RDD with into numPartitions
    +        partitions.
    +
    +        The values in the resulting RDD is iterable object L{ResultIterable},
    +        they can be iterated only once. The `len(values)` will result in
    --- End diff --
    
    Actually, I double-checked and this is just a documentation change, since the previous version of groupByKey() behaves the same way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r17681171
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1588,8 +1599,27 @@ def mergeCombiners(a, b):
                 a.extend(b)
                 return a
     
    -        return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
    -                                 numPartitions).mapValues(lambda x: ResultIterable(x))
    +        spill = self._can_spill()
    +        memory = self._memory_limit()
    +        serializer = self._jrdd_deserializer
    +        agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
    +
    +        def combine(iterator):
    +            merger = ExternalMerger(agg, memory * 0.9, serializer) \
    +                if spill else InMemoryMerger(agg)
    +            merger.mergeValues(iterator)
    +            return merger.iteritems()
    +
    +        locally_combined = self.mapPartitions(combine)
    +        shuffled = locally_combined.partitionBy(numPartitions)
    +
    +        def groupByKey(it):
    +            merger = ExternalGroupBy(agg, memory, serializer)\
    +                if spill else InMemoryMerger(agg)
    +            merger.mergeCombiners(it)
    +            return merger.iteritems()
    +
    +        return shuffled.mapPartitions(groupByKey).mapValues(ResultIterable)
    --- End diff --
    
    Should we set `preservesPartitioning=True` on this `mapPartitions` call, similar to what `combineByKey` does in its final step?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60472577
  
      [Test build #431 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/431/consoleFull) for   PR 1977 at commit [`651f891`](https://github.com/apache/spark/commit/651f8919c0fe8b55967823f1f051ea85e0c4daba).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by mkhaitman <gi...@git.apache.org>.
Github user mkhaitman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r27223983
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -244,72 +258,57 @@ def _next_limit(self):
     
         def mergeValues(self, iterator):
             """ Combine the items by creator and combiner """
    -        iterator = iter(iterator)
             # speedup attribute lookup
             creator, comb = self.agg.createCombiner, self.agg.mergeValue
    -        d, c, batch = self.data, 0, self.batch
    +        c, data, pdata, hfun, batch = 0, self.data, self.pdata, self._partition, self.batch
    +        limit = self.memory_limit
     
             for k, v in iterator:
    +            d = pdata[hfun(k)] if pdata else data
                 d[k] = comb(d[k], v) if k in d else creator(v)
     
                 c += 1
    -            if c % batch == 0 and get_used_memory() > self.memory_limit:
    -                self._spill()
    -                self._partitioned_mergeValues(iterator, self._next_limit())
    -                break
    +            if c >= batch:
    +                if get_used_memory() >= limit:
    +                    self._spill()
    +                    limit = self._next_limit()
    +                    batch /= 2
    +                    c = 0
    +                else:
    +                    batch *= 1.5
     
         def _partition(self, key):
             """ Return the partition for key """
             return hash((key, self._seed)) % self.partitions
     
    -    def _partitioned_mergeValues(self, iterator, limit=0):
    -        """ Partition the items by key, then combine them """
    -        # speedup attribute lookup
    -        creator, comb = self.agg.createCombiner, self.agg.mergeValue
    -        c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch
    -
    -        for k, v in iterator:
    -            d = pdata[hfun(k)]
    -            d[k] = comb(d[k], v) if k in d else creator(v)
    -            if not limit:
    -                continue
    -
    -            c += 1
    -            if c % batch == 0 and get_used_memory() > limit:
    -                self._spill()
    -                limit = self._next_limit()
    +    def _object_size(self, obj):
    +        """ How much of memory for this obj, assume that all the objects
    +        consume similar bytes of memory
    +        """
    +        return 1
     
    -    def mergeCombiners(self, iterator, check=True):
    +    def mergeCombiners(self, iterator, limit=None):
             """ Merge (K,V) pair by mergeCombiner """
    -        iterator = iter(iterator)
    +        if limit is None:
    +            limit = self.memory_limit
             # speedup attribute lookup
    -        d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
    -        c = 0
    -        for k, v in iterator:
    -            d[k] = comb(d[k], v) if k in d else v
    -            if not check:
    -                continue
    -
    -            c += 1
    -            if c % batch == 0 and get_used_memory() > self.memory_limit:
    -                self._spill()
    -                self._partitioned_mergeCombiners(iterator, self._next_limit())
    -                break
    -
    -    def _partitioned_mergeCombiners(self, iterator, limit=0):
    -        """ Partition the items by key, then merge them """
    -        comb, pdata = self.agg.mergeCombiners, self.pdata
    -        c, hfun = 0, self._partition
    +        comb, hfun, objsize = self.agg.mergeCombiners, self._partition, self._object_size
    +        c, data, pdata, batch = 0, self.data, self.pdata, self.batch
             for k, v in iterator:
    -            d = pdata[hfun(k)]
    +            d = pdata[hfun(k)] if pdata else data
                 d[k] = comb(d[k], v) if k in d else v
                 if not limit:
                     continue
     
    -            c += 1
    -            if c % self.batch == 0 and get_used_memory() > limit:
    -                self._spill()
    -                limit = self._next_limit()
    +            c += objsize(v)
    +            if c > batch:
    +                if get_used_memory() > limit:
    +                    self._spill()
    +                    limit = self._next_limit()
    +                    batch /= 2
    +                    c = 0
    +                else:
    +                    batch *= 1.5
     
    --- End diff --
    
    Would it make sense to do a final memory check after the for loop, in case we just added another 999 items (worst case) without spilling any of it to disk?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60722423
  
      [Test build #22348 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22348/consoleFull) for   PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalList(object):`
      * `class GroupByKey(object):`
      * `class ChainedIterable(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91121784
  
      [Test build #29925 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29925/consoleFull) for   PR 1977 at commit [`67772dd`](https://github.com/apache/spark/commit/67772dd9e5fc2157f1da960aebfa82b133615527).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r20132241
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -520,6 +505,295 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        self.disk_count = 0
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            bytes = f.read()
    +        else:
    +            bytes = ''
    +        return self.values, self.disk_count, bytes
    +
    +    def __setstate__(self, item):
    +        self.values, self.disk_count, bytes = item
    +        if bytes:
    +            self._open_file()
    +            self._file.write(bytes)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.disk_count + len(self.values)
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    +        self.disk_count += len(self.values)
    +        self.values = []
    +        gc.collect()
    +        DiskBytesSpilled += self._file.tell() - pos
    +        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
    +
    +
    +class GroupByKey(object):
    +    """
    +    group a sorted iterator into [(k1, it1), (k2, it2), ...]
    +
    +    >>> k = [i/3 for i in range(6)]
    +    >>> v = [i for i in range(6)]
    +    >>> g = GroupByKey(iter(zip(k, v)))
    +    >>> [(k, list(it)) for k, it in g]
    +    [(0, [0, 1, 2]), (1, [3, 4, 5])]
    +    """
    +    def __init__(self, iterator):
    +        self.iterator = iterator
    +        self.next_item = None
    +
    +    def __iter__(self):
    +        return self
    +
    +    def next(self):
    +        key, value = self.next_item if self.next_item else next(self.iterator)
    +        values = ExternalList([value])
    +        try:
    +            while True:
    +                k, v = next(self.iterator)
    +                if k != key:
    +                    self.next_item = (k, v)
    +                    break
    +                values.append(v)
    +        except StopIteration:
    +            self.next_item = None
    +        return key, values
    +
    +
    +class ChainedIterable(object):
    +    """
    +    Picklable chained iterator, similar to itertools.chain.fromiterable()
    +    """
    +    def __init__(self, iterators):
    +        self.iterators = iterators
    +
    +    def __len__(self):
    +        return sum(len(vs) for vs in self.iterators)
    +
    +    def __iter__(self):
    +        return itertools.chain.fromiterable(self.iterators)
    +
    +
    +class ExternalGroupBy(ExternalMerger):
    +
    +    """
    +    Group by the items by key. If any partition of them can not been
    +    hold in memory, it will do sort based group by.
    +
    +    This class works as follows:
    +
    +    - It repeatedly group the items by key and save them in one dict in
    +      memory.
    +
    +    - When the used memory goes above memory limit, it will split
    +      the combined data into partitions by hash code, dump them
    +      into disk, one file per partition. If the number of keys
    +      in one partitions is smaller than 1000, it will sort them
    +      by key before dumping into disk.
    +
    +    - Then it goes through the rest of the iterator, group items
    +      by key into different dict by hash. Until the used memory goes over
    +      memory limit, it dump all the dicts into disks, one file per
    +      dict. Repeat this again until combine all the items. It
    +      also will try to sort the items by key in each partition
    +      before dumping into disks.
    +
    +    - It will yield the grouped items partitions by partitions.
    +      If the data in one partitions can be hold in memory, then it
    +      will load and combine them in memory and yield.
    +
    +    - If the dataset in one partittion cannot be hold in memory,
    --- End diff --
    
    nit: partition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52860780
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19015/consoleFull) for   PR 1977 at commit [`85138e6`](https://github.com/apache/spark/commit/85138e6debc7bccd75578c2ffdf030c3d483ce4e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28003896
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -367,32 +372,13 @@ def iteritems(self):
     
         def _external_items(self):
             """ Return all partitioned items as iterator """
    -        assert not self.data
             if any(self.pdata):
                 self._spill()
    -        hard_limit = self._next_limit()
    +        self.pdata = []
    --- End diff --
    
    It looks like the old code didn't clear `pdata` after spilling, but I'm guessing that wasn't a problem because we didn't iterate multiple times before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52603829
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18823/consoleFull) for   PR 1977 at commit [`11ba318`](https://github.com/apache/spark/commit/11ba318a39cda531894f85b318415d74dfd1b082).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")`
      * `  case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")`
      * `  case class Params(input: String = "data/mllib/sample_binary_classification_data.txt")`
      * `class ResultIterable(object):`
      * `class FlattedValuesSerializer(BatchedSerializer):`
      * `class ExternalSorter(object):`
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ExternalGroupBy(ExternalMerger):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52378307
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18642/consoleFull) for   PR 1977 at commit [`083d842`](https://github.com/apache/spark/commit/083d842b2e784674d2ec6823ac4da81140ac48cd).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SameKey(object):`
      * `class GroupByKey(object):`
      * `class ResultIterable(object):`
      * `class ExternalSorter(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-91384867
  
    LGTM.  I spent more time testing this locally, commenting out various memory threshold flags as necessary in order to get good branch coverage, and didn't find any new problems.  We should definitely do performance benchmarking of this feature during the 1.4 QA period in order to quantify its impact, but that isn't a blocker to merging this now.  If this does turn out to have any performance issues for certain workloads, users should be able to feature-flag it by configuring Spark with a higher spilling threshold (or we could introduce a new flag specifically to bypass this).
    
    I'm going to merge this into `master` (1.4.0).   Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-60718263
  
      [Test build #22348 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22348/consoleFull) for   PR 1977 at commit [`ab5515b`](https://github.com/apache/spark/commit/ab5515b4eaf93e95a9d9e04f11538119ca25030b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-52721772
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18908/consoleFull) for   PR 1977 at commit [`f157fe7`](https://github.com/apache/spark/commit/f157fe7954502b72c420e0b96201b3d72a5365cd).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-55978000
  
    Can you guys also add some tests that do rdd.groupByKey().filter().map(), and skip some of the groups? As well as tests that iterate over the values in a SameKey object twice? (Our contract is that you can iterate over them as many times as you'd like).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28003603
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -367,32 +372,13 @@ def iteritems(self):
     
         def _external_items(self):
             """ Return all partitioned items as iterator """
    -        assert not self.data
    --- End diff --
    
    I noticed that you moved a few of these assertions.  I guess the old assumption was that once we've spilled, we'll stop using `data` and only aggregate into `pdata`, given that we clear `data` in the first branch of `_spill`.  Why has this assumption changed here?  It looks like we do end up writing to `data` again inside of this `_external_items` method, but then we end up clearing `data` at the end after the iterator has been consumed.
    
    Was this change necessary in order to support iterating multiple times over the merged result?  Just want to double-check my understanding here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r28019825
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -529,6 +522,301 @@ def sorted(self, iterator, key=None, reverse=False):
             return heapq.merge(chunks, key=key, reverse=reverse)
     
     
    +class ExternalList(object):
    +    """
    +    ExternalList can have many items which cannot be hold in memory in
    +    the same time.
    +
    +    >>> l = ExternalList(range(100))
    +    >>> len(l)
    +    100
    +    >>> l.append(10)
    +    >>> len(l)
    +    101
    +    >>> for i in range(10240):
    +    ...     l.append(i)
    +    >>> len(l)
    +    10341
    +    >>> import pickle
    +    >>> l2 = pickle.loads(pickle.dumps(l))
    +    >>> len(l2)
    +    10341
    +    >>> list(l2)[100]
    +    10
    +    """
    +    LIMIT = 10240
    +
    +    def __init__(self, values):
    +        self.values = values
    +        if values and isinstance(values[0], list):
    +            self.count = sum(len(i) for i in values)
    +        else:
    +            self.count = len(values)
    +        self._file = None
    +        self._ser = None
    +
    +    def __getstate__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            f = os.fdopen(os.dup(self._file.fileno()))
    +            f.seek(0)
    +            serialized = f.read()
    +        else:
    +            serialized = ''
    +        return self.values, self.count, serialized
    +
    +    def __setstate__(self, item):
    +        self.values, self.count, serialized = item
    +        if serialized:
    +            self._open_file()
    +            self._file.write(serialized)
    +        else:
    +            self._file = None
    +            self._ser = None
    +
    +    def __iter__(self):
    +        if self._file is not None:
    +            self._file.flush()
    +            # read all items from disks first
    +            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
    +                f.seek(0)
    +                for values in self._ser.load_stream(f):
    +                    for v in values:
    +                        yield v
    +
    +        for v in self.values:
    +            yield v
    +
    +    def __len__(self):
    +        return self.count
    +
    +    def append(self, value):
    +        self.values.append(value)
    +        self.count += len(value) if isinstance(value, list) else 1
    +        # dump them into disk if the key is huge
    +        if len(self.values) >= self.LIMIT:
    +            self._spill()
    +
    +    def _open_file(self):
    +        dirs = _get_local_dirs("objects")
    +        d = dirs[id(self) % len(dirs)]
    +        if not os.path.exists(d):
    +            os.makedirs(d)
    +        p = os.path.join(d, str(id))
    +        self._file = open(p, "w+", 65536)
    +        self._ser = CompressedSerializer(PickleSerializer())
    +        os.unlink(p)
    +
    +    def _spill(self):
    +        """ dump the values into disk """
    +        global MemoryBytesSpilled, DiskBytesSpilled
    +        if self._file is None:
    +            self._open_file()
    +
    +        used_memory = get_used_memory()
    +        pos = self._file.tell()
    +        self._ser.dump_stream([self.values], self._file)
    +        self.values = []
    +        gc.collect()
    +        DiskBytesSpilled += self._file.tell() - pos
    +        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
    +
    +
    +class GroupByKey(object):
    +    """
    +    group a sorted iterator into [(k1, it1), (k2, it2), ...]
    +
    +    >>> k = [i/3 for i in range(6)]
    +    >>> v = [i for i in range(6)]
    +    >>> g = GroupByKey(iter(zip(k, v)))
    +    >>> [(k, list(it)) for k, it in g]
    +    [(0, [0, 1, 2]), (1, [3, 4, 5])]
    +    """
    +    def __init__(self, iterator):
    +        self.iterator = iterator
    +        self.next_item = None
    +
    +    def __iter__(self):
    +        return self
    +
    +    def next(self):
    +        key, value = self.next_item if self.next_item else next(self.iterator)
    +        values = ExternalList([value])
    +        try:
    +            while True:
    +                k, v = next(self.iterator)
    +                if k != key:
    +                    self.next_item = (k, v)
    +                    break
    +                values.append(v)
    +        except StopIteration:
    +            self.next_item = None
    +        return key, values
    +
    +
    +class ChainedIterable(object):
    +    """
    +    Picklable chained iterator, similar to itertools.chain.from_iterable()
    +    """
    +    def __init__(self, iterators):
    +        self.iterators = iterators
    +
    +    def __len__(self):
    +        try:
    +            return len(self.iterators)
    +        except:
    --- End diff --
    
    Can this be `except TypeError` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3074] [PySpark] support groupByKey() wi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1977#issuecomment-56776628
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/151/consoleFull) for   PR 1977 at commit [`0d3395f`](https://github.com/apache/spark/commit/0d3395f32f587403714ac58c6d4f914ba235ceea).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org