You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2014/08/22 06:45:04 UTC

[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/2092

    [SPARK-2871] [PySpark] add zipWithIndex() and zipWithUniqueId()

    RDD.zipWithIndex()
    
            Zips this RDD with its element indices.
    
            The ordering is first based on the partition index and then the
            ordering of items within each partition. So the first item in
            the first partition gets index 0, and the last item in the last
            partition receives the largest index.
    
            This method needs to trigger a spark job when this RDD contains
            more than one partitions.
    
            >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
            [(0, 0), (1, 1), (2, 2), (3, 3)]
    
    
    RDD.zipWithUniqueId()
    
            Zips this RDD with generated unique Long ids.
    
            Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
            n is the number of partitions. So there may exist gaps, but this
            method won't trigger a spark job, which is different from
            L{zipWithIndex}
    
            >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
            [(0, 0), (2, 1), (1, 2), (3, 3)]

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark zipWith

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2092.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2092
    
----
commit 0d2a128da7f24214e6714a93f7402aaea93075f2
Author: Davies Liu <da...@gmail.com>
Date:   2014-08-22T04:42:00Z

    add zipWithIndex() and zipWithUniqueId()

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by mattf <gi...@git.apache.org>.
Github user mattf commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53139571
  
    are you going to add tests for these?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2092


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2092#discussion_r16634474
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1715,6 +1715,52 @@ def batch_as(rdd, batchSize):
                                             other._jrdd_deserializer)
             return RDD(pairRDD, self.ctx, deserializer)
     
    +    def zipWithIndex(self):
    +        """
    +        Zips this RDD with its element indices.
    +
    +        The ordering is first based on the partition index and then the
    +        ordering of items within each partition. So the first item in
    +        the first partition gets index 0, and the last item in the last
    +        partition receives the largest index.
    +
    +        This method needs to trigger a spark job when this RDD contains
    +        more than one partitions.
    +
    +        >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
    +        [(0, 0), (1, 1), (2, 2), (3, 3)]
    --- End diff --
    
    I will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53225461
  
    LGTM, so I've merged this into `master`.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53307538
  
    I also merged this into `branch-1.1`, since it's an often-requested feature and only adds code to this file (so it's low-risk to merge).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53180206
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53180259
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19121/consoleFull) for   PR 2092 at commit [`cebe5bf`](https://github.com/apache/spark/commit/cebe5bfe263baf3349353f1473f097396821514a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53022596
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19072/consoleFull) for   PR 2092 at commit [`0d2a128`](https://github.com/apache/spark/commit/0d2a128da7f24214e6714a93f7402aaea93075f2).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53145677
  
    I think doc tests should be enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53025408
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19072/consoleFull) for   PR 2092 at commit [`0d2a128`](https://github.com/apache/spark/commit/0d2a128da7f24214e6714a93f7402aaea93075f2).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2092#discussion_r16634476
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1715,6 +1715,52 @@ def batch_as(rdd, batchSize):
                                             other._jrdd_deserializer)
             return RDD(pairRDD, self.ctx, deserializer)
     
    +    def zipWithIndex(self):
    +        """
    +        Zips this RDD with its element indices.
    +
    +        The ordering is first based on the partition index and then the
    +        ordering of items within each partition. So the first item in
    +        the first partition gets index 0, and the last item in the last
    +        partition receives the largest index.
    +
    +        This method needs to trigger a spark job when this RDD contains
    +        more than one partitions.
    +
    +        >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
    +        [(0, 0), (1, 1), (2, 2), (3, 3)]
    +        """
    +        starts = [0]
    +        if self.getNumPartitions() > 1:
    +            nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
    +            for i in range(len(nums) - 1):
    +                starts.append(starts[-1] + nums[i])
    +
    +        def func(k, it):
    +            return enumerate(it, starts[k])
    +
    +        return self.mapPartitionsWithIndex(func)
    +
    +    def zipWithUniqueId(self):
    +        """
    +        Zips this RDD with generated unique Long ids.
    +
    +        Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
    +        n is the number of partitions. So there may exist gaps, but this
    +        method won't trigger a spark job, which is different from
    +        L{zipWithIndex}
    +
    +        >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
    --- End diff --
    
    good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53181636
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19121/consoleFull) for   PR 2092 at commit [`cebe5bf`](https://github.com/apache/spark/commit/cebe5bfe263baf3349353f1473f097396821514a).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class BoundedFloat(float):`
      * `class JoinedRow2 extends Row `
      * `class JoinedRow3 extends Row `
      * `class JoinedRow4 extends Row `
      * `class JoinedRow5 extends Row `
      * `class GenericRow(protected[sql] val values: Array[Any]) extends Row `
      * `abstract class MutableValue extends Serializable `
      * `final class MutableInt extends MutableValue `
      * `final class MutableFloat extends MutableValue `
      * `final class MutableBoolean extends MutableValue `
      * `final class MutableDouble extends MutableValue `
      * `final class MutableShort extends MutableValue `
      * `final class MutableLong extends MutableValue `
      * `final class MutableByte extends MutableValue `
      * `final class MutableAny extends MutableValue `
      * `final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow `
      * `case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate `
      * `case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression `
      * `case class CollectHashSetFunction(`
      * `case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression `
      * `case class CombineSetsAndCountFunction(`
      * `case class CountDistinctFunction(`
      * `case class MaxOf(left: Expression, right: Expression) extends Expression `
      * `case class NewSet(elementType: DataType) extends LeafExpression `
      * `case class AddItemToSet(item: Expression, set: Expression) extends Expression `
      * `case class CombineSets(left: Expression, right: Expression) extends BinaryExpression `
      * `case class CountSet(child: Expression) extends UnaryExpression `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by mattf <gi...@git.apache.org>.
Github user mattf commented on the pull request:

    https://github.com/apache/spark/pull/2092#issuecomment-53151265
  
    fair enough
    
    +1 lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2092#discussion_r16633434
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1715,6 +1715,52 @@ def batch_as(rdd, batchSize):
                                             other._jrdd_deserializer)
             return RDD(pairRDD, self.ctx, deserializer)
     
    +    def zipWithIndex(self):
    +        """
    +        Zips this RDD with its element indices.
    +
    +        The ordering is first based on the partition index and then the
    +        ordering of items within each partition. So the first item in
    +        the first partition gets index 0, and the last item in the last
    +        partition receives the largest index.
    +
    +        This method needs to trigger a spark job when this RDD contains
    +        more than one partitions.
    +
    +        >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
    +        [(0, 0), (1, 1), (2, 2), (3, 3)]
    +        """
    +        starts = [0]
    +        if self.getNumPartitions() > 1:
    +            nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
    +            for i in range(len(nums) - 1):
    +                starts.append(starts[-1] + nums[i])
    +
    +        def func(k, it):
    +            return enumerate(it, starts[k])
    +
    +        return self.mapPartitionsWithIndex(func)
    +
    +    def zipWithUniqueId(self):
    +        """
    +        Zips this RDD with generated unique Long ids.
    +
    +        Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
    +        n is the number of partitions. So there may exist gaps, but this
    +        method won't trigger a spark job, which is different from
    +        L{zipWithIndex}
    +
    +        >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
    --- End diff --
    
    Here, it might be better to use three partitions (or some other value) so that there's a gap in the ids.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2871] [PySpark] add zipWithIndex() and ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2092#discussion_r16633420
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -1715,6 +1715,52 @@ def batch_as(rdd, batchSize):
                                             other._jrdd_deserializer)
             return RDD(pairRDD, self.ctx, deserializer)
     
    +    def zipWithIndex(self):
    +        """
    +        Zips this RDD with its element indices.
    +
    +        The ordering is first based on the partition index and then the
    +        ordering of items within each partition. So the first item in
    +        the first partition gets index 0, and the last item in the last
    +        partition receives the largest index.
    +
    +        This method needs to trigger a spark job when this RDD contains
    +        more than one partitions.
    +
    +        >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
    +        [(0, 0), (1, 1), (2, 2), (3, 3)]
    --- End diff --
    
    This isn't the best example because it's not clear which element is the item and which element is its index.  In the Scala API, this is clear from the method's return type.  Maybe we should update the documentation to explicitly state that the second element is the id (like the Scala API).
    
    I think this implementation has things backwards w.r.t. the Scala one:
    
    ```python
    >>> sc.parallelize(['a', 'b', 'c', 'd'], 2).zipWithIndex().collect()
    [(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd')]
    ```
    
    versus
    
    ```scala
    scala> sc.parallelize(Seq('a', 'b', 'c', 'd')).zipWithIndex().collect()
    res0: Array[(Char, Long)] = Array((a,0), (b,1), (c,2), (d,3))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org