You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jtengyp <gi...@git.apache.org> on 2017/05/08 07:50:43 UTC

[GitHub] spark pull request #17898: Update CartesianRDD.scala

GitHub user jtengyp opened a pull request:

    https://github.com/apache/spark/pull/17898

    Update CartesianRDD.scala

    In compute, group each iterator to multiple groups, reducing repeatedly data fetching.
    
    ## What changes were proposed in this pull request?
    
    In compute, group each iterator to multiple groups. Thus in the second iteration, the data with be fetched (num of data)/groupSize times, rather than (num of data) times.
    
    ## How was this patch tested?
    
    The existing UT.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jtengyp/spark patch-1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17898.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17898
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17898: Optimize the CartesianRDD to reduce repeatedly da...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17898#discussion_r115210488
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val groupSize = 500;
    +    for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
    +         y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize);
    --- End diff --
    
    Pardon, doesn't this change the type of the result? you're iterating over groupings not elements, and emitting pairs of groups. As in below, but maybe I'm missing something.
    
    ```
    scala> val foo = List(1,2,3)
    foo: List[Int] = List(1, 2, 3)
    
    scala> val bar = List(4,5,6)
    bar: List[Int] = List(4, 5, 6)
    
    scala> for (x <- foo; y <- bar) yield (x, y)
    res0: List[(Int, Int)] = List((1,4), (1,5), (1,6), (2,4), (2,5), (2,6), (3,4), (3,5), (3,6))
    
    scala> (for (x <- foo.grouped(2); y <- bar.grouped(2)) yield (x, y)).foreach(println)
    (List(1, 2),List(4, 5))
    (List(1, 2),List(6))
    (List(3),List(4, 5))
    (List(3),List(6))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17898: Optimize the CartesianRDD

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/17898
  
    Maybe create a JIRA and update title as Spark PR convention. Since this should be a performance improvement, the difference is expected to show.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17898: Optimize the CartesianRDD to reduce repeatedly da...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17898#discussion_r115232250
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val groupSize = 500;
    +    for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
    +         y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize);
    --- End diff --
    
    I working on this too. But the optimize method maybe similar to the pr which @viirya opened before, cache the second iterator into local.  The code is ready, maybe open a pr in recently. In this patch, I worry about whether we can accurately control the size of the buffer. If we should cache it by `BlockManager` or `TaskConsumer`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17898: Optimize the CartesianRDD to reduce repeatedly da...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17898#discussion_r115211058
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val groupSize = 500;
    +    for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
    +         y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize);
    --- End diff --
    
    The actual yield is on (i, j) and not (x, y) - the next line adds the iteration over the groupings :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17898: Optimize the CartesianRDD

Posted by jtengyp <gi...@git.apache.org>.
Github user jtengyp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17898#discussion_r115199537
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val groupSize = 500;
    +    for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
    +         y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize);
    --- End diff --
    
    This is indeed a disadvantage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17898: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by jtengyp <gi...@git.apache.org>.
Github user jtengyp closed the pull request at:

    https://github.com/apache/spark/pull/17898


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17898: Optimize the CartesianRDD to reduce repeatedly data fetc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17898
  
    **[Test build #3697 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3697/testReport)** for PR 17898 at commit [`d2cbcdd`](https://github.com/apache/spark/commit/d2cbcddab12a62f743fd78afc5d483f3ce161868).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17898: Optimize the CartesianRDD to reduce repeatedly da...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17898#discussion_r115211527
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val groupSize = 500;
    +    for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
    +         y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize);
    --- End diff --
    
    I agree with @viirya - there is also an implicit assumption of size here : the batch will get deserialized into memory.
    By default, we have kept the iterator model going in spark without needing to buffer (iirc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17898: Optimize the CartesianRDD to reduce repeatedly da...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17898#discussion_r115269762
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val groupSize = 500;
    +    for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
    +         y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize);
    --- End diff --
    
    Oh haha right. Hm, but isn't this better solved 'upstream' by buffering an iterator somewhere? or just buffering the iterator right here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17898: Optimize the CartesianRDD to reduce repeatedly data fetc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17898
  
    **[Test build #3697 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3697/testReport)** for PR 17898 at commit [`d2cbcdd`](https://github.com/apache/spark/commit/d2cbcddab12a62f743fd78afc5d483f3ce161868).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17898: Update CartesianRDD.scala

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17898
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17898: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/17898
  
    @jtengyp I think we won't proceed with this version, so this can be closed, but see the discussion at  https://github.com/apache/spark/pull/17936


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17898: Update CartesianRDD.scala

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/17898
  
    Please read http://spark.apache.org/contributing.html
    For example, "Update X" is never sufficient as a title.
    How does this avoid fetching? how much difference does it make?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17898: Optimize the CartesianRDD

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17898#discussion_r115199237
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -72,8 +72,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val groupSize = 500;
    +    for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
    +         y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize);
    --- End diff --
    
    One disadvantage I can think now is, longer waiting time for first element. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17898: Optimize the CartesianRDD to reduce repeatedly data fetc...

Posted by jtengyp <gi...@git.apache.org>.
Github user jtengyp commented on the issue:

    https://github.com/apache/spark/pull/17898
  
    Here is my test:
    Environment : 3 workers, each has 10 cores, 30G memory, 1 executor
    Test data : users : 480,189, each is a 10-dim vector, and items : 17770, each is a 10-dim vector.
    With default CartesianRDD, cartesian time is 2420.7s.
    With this proposal, cartesian time is 45.3s
    50x faster than the original method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org