You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ConeyLiu <gi...@git.apache.org> on 2017/05/10 08:38:23 UTC

[GitHub] spark pull request #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD...

GitHub user ConeyLiu opened a pull request:

    https://github.com/apache/spark/pull/17936

    [SPARK-20638][Core][WIP]Optimize the CartesianRDD to reduce repeatedly data fetching

    ## What changes were proposed in this pull request?
    
    This path aims to solve the poor performance of `RDD.cartesian`. In the original method of `cartesian`, it need repeatedly fetch remotely data or recompute, so the performance is poor. In this path cache the second partition into the local with `BlockManager`. There are two advantage:
    
    - Because we cache it with `BlockManager` and set the storage level as `MEMORY_AND_DISK`, so we don't need care the OOM caused by the buffer.
    
    - Many task may depend on the same block (the second partition) for calculation, so don't remove the block when other task need it. This can reduce the times of fetching or calculate.
    
    
    ## How was this patch tested?
    Test enviroments:  4 Executors(10 core, 30GB Mem) in one node with(4 ssd + 7hdd)
    Test case:
    ```
    def randomValue(): String = {
          val random = Random.alphanumeric
          random.take(100).mkString
        }
    
        val keys = sc.parallelize(1 to 10000L)
        val data1 = keys.map(id => (id, randomValue()))
        val data2 = keys.map(id => (id, randomValue()))
    
        data1.repartition(30)
        data2.repartition(30)
    
        val pairs = data1.cartesian(data2).filter {
          case (x, y) => StringUtils.getLevenshteinDistance(x._2, y._2) > 0.3
        }
    
        val start = System.nanoTime()
        pairs.count()
        println((System.nanoTime() - start) / 1e6)
    ```
    
    Before:
    353491.027379
    After:
    94516.680067


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ConeyLiu/spark cartesian

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17936.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17936
    
----
commit 1802ff0e12359240ebc40223dff351e3d4886002
Author: Xianyang Liu <xi...@intel.com>
Date:   2017-05-09T15:11:12Z

    optimize rdd cartesian with caching the block in local

commit 08e25c9c6ca723383fc3af20137db356a8f65262
Author: Xianyang Liu <xi...@intel.com>
Date:   2017-05-09T15:20:05Z

    add test case

commit 0f812d944c1db12faa7f03059ec49c2c377dfcc9
Author: Xianyang Liu <xi...@intel.com>
Date:   2017-05-10T03:24:28Z

    put block into local with read block

commit 08c1849e7fbc3edd6d4c5c60e2fab93e9cb1ee03
Author: Xianyang Liu <xi...@intel.com>
Date:   2017-05-10T07:47:46Z

    follow the code style and add some comments

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r116959739
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,85 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    +        rdd: RDD[U],
    +        partition: Partition,
    +        context: TaskContext,
    +        level: StorageLevel): Iterator[U] = {
    +      getLocalValues() match {
    +        case Some(result) =>
    +          return result
    +        case None => // do nothing
    +      }
    +
    +      val iterator = rdd.iterator(partition, context)
    +      // Keep read lock, because next we need read it. And don't tell master.
    +      blockManager.putIterator[U](blockId2, iterator, level, false, true) match {
    +        case true =>
    --- End diff --
    
    BTW I think match is confusing overkill for booleans. Just use if-else


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Looks like there's a similar PR #17898 trying to address this issue, can you please elaborate your difference compared to that one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by suyanNone <gi...@git.apache.org>.
Github user suyanNone commented on the issue:

    https://github.com/apache/spark/pull/17936
  
     May create a MemoryAndDiskArray like ExternalAppendOnlyMap? MemoryAndDiskArray, not also use here and also groupByKey?  and it memory can controller by MemoryManager


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Hi, @squito, @cloud-fan. Can you help review this code?  Thanks a lot. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117425810
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    +        rdd: RDD[U],
    +        partition: Partition,
    +        context: TaskContext,
    +        level: StorageLevel): Iterator[U] = {
    +      getLocalValues() match {
    +        case Some(result) =>
    +          return result
    +        case None => if (holdReadLock) {
    +          throw new SparkException(s"get() failed for block $blockId2 even though we held a lock")
    +        }
    +      }
    +
    +      val iterator = rdd.iterator(partition, context)
    +      if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) {
    +        // If the block is cached in local, wo shouldn't cache it again.
    --- End diff --
    
    Yeah, but if it isn't cached in local, the next loop will try call the iterator again, then we will call the `getOrElse`. You means we should check if it is cached, try cached it again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117429928
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    --- End diff --
    
    Btw, can we move those functions out of `compute`? Too many nested functions here and making `compute` too big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117393634
  
    --- Diff: core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala ---
    @@ -198,8 +198,12 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
         // write files to disk so we can read them later.
         sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
         val aRdd = sc.textFile(cartFilePath, numPartitions)
    +    aRdd.cache()
    +    aRdd.count()
     
         val tmpRdd = sc.textFile(tmpFilePath, numPartitions)
    +    tmpRdd.cache()
    +    tmpRdd.count()
    --- End diff --
    
    Because we cache the rdd in the CartesianRDD compute method, so there we should count the bytes read from memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    @jerryshao As you mentioned broadcasting, another question might be, can we just use broadcasting to achieve similar performance without such changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    OK, thanks a lot.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Seems it should be still better than original cartesian, since it saves re-computing RDD, re-transferring data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    In Spark SQL we have `UnsafeCartesianRDD` which already has this optimization so this patch won't benifit Spark SQL.
    
    As we are encouraging users to use Spark SQL as the main programing interface instead of RDD, it seems to me that this patch is not very useful for Spark.
    
    BTW I think it's hard to optimize `CartesianRDD` without regression, IIRC there were many PRs try to optimize it but didn't get a consensus.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Hi @rxin, would you mind take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    **[Test build #3708 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3708/testReport)** for PR 17936 at commit [`08c1849`](https://github.com/apache/spark/commit/08c1849e7fbc3edd6d4c5c60e2fab93e9cb1ee03).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Sorry for the mistake, this test result should be the cached situation:
    | ------| ------ | ------ |
    | 15.877s | 2827.373s | 178x |
    | 16.781s | 2809.502s | 167x |
    | 16.320s | 2845.699s | 174x |
    | 19.437s | 2860.387s | 147x |
    | 16.793s | 2931.667s | 174x|
    
    Test case:
    ```
    object TestNetflixlib {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Test Netflix mlib")
        val sc = new SparkContext(conf)
    
        val data = sc.textFile("hdfs://10.1.2.173:9000/nf_training_set.txt")
    
        val ratings = data.map(_.split("::") match {
          case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
        })
    
        val rank = 0
        val numIterations = 10
        val train_start = System.nanoTime()
        val model = ALS.train(ratings, rank, numIterations, 0.01)
        val user = model.userFeatures
        val item = model.productFeatures
        val start = System.nanoTime()
        val rate = user.cartesian(item)
        println(rate.count())
        val time = (System.nanoTime() - start) / 1e9
        println(time)
      }
    }
    ```
    
    The RDDs (user and item) should be cached.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    I agreed with @srowen. This adds quite complexity. If there is no much difference comparing with caching RDDs before doing cartesian (or other ways), it may not worth such complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    OK, I'll add it. From the test data, performance is still very obvious. Mainly from the network and disk overhead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117424810
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    +        rdd: RDD[U],
    +        partition: Partition,
    +        context: TaskContext,
    +        level: StorageLevel): Iterator[U] = {
    +      getLocalValues() match {
    +        case Some(result) =>
    +          return result
    +        case None => if (holdReadLock) {
    +          throw new SparkException(s"get() failed for block $blockId2 even though we held a lock")
    +        }
    +      }
    +
    +      val iterator = rdd.iterator(partition, context)
    +      if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) {
    +        // If the block is cached in local, wo shouldn't cache it again.
    --- End diff --
    
    `getOrElseUpdate` doesn't guarantee the block can be successfully cached. It can be failed to cache it. In this case, it simply returns 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117427240
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    +        rdd: RDD[U],
    +        partition: Partition,
    +        context: TaskContext,
    +        level: StorageLevel): Iterator[U] = {
    +      getLocalValues() match {
    +        case Some(result) =>
    +          return result
    +        case None => if (holdReadLock) {
    +          throw new SparkException(s"get() failed for block $blockId2 even though we held a lock")
    +        }
    +      }
    +
    +      val iterator = rdd.iterator(partition, context)
    +      if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) {
    +        // If the block is cached in local, wo shouldn't cache it again.
    --- End diff --
    
    Ok, I'll change it, thanks very much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    @srowen  Sorry for the late reply. I updated the code. Because we should reduce times of the remotely fetch, the second partition should be cached in locally. There are two ways, first cached by the `TaskConsumer` which controlled by the `Execution Memory`(this methods seems #9969); Second, cached by the `BlockManager` which controlled by the `Storage Memory`.  Through the experiment found that the first way gc problem is very serious. 
    
    Cartesian only used in `ALS` and `UnsafeCartesianRDD`. However, the latter itself implements a `Cartesian`, you can see as follow:
    ```
    class UnsafeCartesianRDD(
        left : RDD[UnsafeRow],
        right : RDD[UnsafeRow],
        numFieldsOfRight: Int,
        spillThreshold: Int)
      extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
    
      override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
        val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
    
        val partition = split.asInstanceOf[CartesianPartition]
        rdd2.iterator(partition.s2, context).foreach(rowArray.add)
    
        // Create an iterator from rowArray
        def createIter(): Iterator[UnsafeRow] = rowArray.generateIterator()
    
        val resultIter =
          for (x <- rdd1.iterator(partition.s1, context);
               y <- createIter()) yield (x, y)
        CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]](
          resultIter, rowArray.clear())
      }
    }
    ```
    So I think there should be no other impact.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    From my first glance, I have several questions:
    
    1. If the parent's partition has already been cached in local blockmanager, do we need to cache again?
    2. There will be situation several tasks are waiting for one task to materialize the iterator into blockmanager, can we improve this?
    3. If the memory is not enough, is it always faster to read from disk than to re-computation from parent partition?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    hi @jerryshao,thanks for your review. In #17898,there is a potential buffer to cache the data,so we should control the groupsize very careful. Because for small size,it need fetch more times. For lager,there is potential OOM. So,in this pr we using blockmanager to cache it. And the block cached can be used multiple task in same executor.  But that patch changed little.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    I can understand any code change in Spark core will be hard to review due to the regression concern, I think we can leave the PR for discussion.
    1) Actually the `UnsafeCartesianRDD` doesn't aware the block locality and will re-fetch the data from remote even the data has been fetched by another local node task, that's why we have to change some code in `BlockManager`.
    2) For some existing application based on RDD, like the `MLLib` still are using the `CartesianRDD`, and we can observe 50x performance boosting in ALS prediction. Previously even we couldn't finish the ALS predication without this optimization until we well tuning lots of things.
    3) Repeatable data block iterations probably very useful for new API implementations like Cartesian Product for Machine Learning due to performance concern, unfortunately the `BlockManager` doesn't provide this feature, and we may add some other operations based on this improvement in the future, that's why we think it's important.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Hi @jerryshao . Good advice. Because here choose `MEMORY_AND_DISK`, it should be failed from the logic of `blockManager.putIterator`, or else the error should be irrevesible. Maybe I understand the wrong, please pointer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    A cluster version of the comparison results, I will be given later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117426327
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    +        rdd: RDD[U],
    +        partition: Partition,
    +        context: TaskContext,
    +        level: StorageLevel): Iterator[U] = {
    +      getLocalValues() match {
    +        case Some(result) =>
    +          return result
    +        case None => if (holdReadLock) {
    +          throw new SparkException(s"get() failed for block $blockId2 even though we held a lock")
    +        }
    +      }
    +
    +      val iterator = rdd.iterator(partition, context)
    +      if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) {
    +        // If the block is cached in local, wo shouldn't cache it again.
    --- End diff --
    
    No. I mean here if `getStorageLevel != StorageLevel.NONE`, you assume the block is cached and return the iterator. However, the caching can be failed and you just return the computed iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117393923
  
    --- Diff: core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala ---
    @@ -198,8 +198,12 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
         // write files to disk so we can read them later.
         sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
         val aRdd = sc.textFile(cartFilePath, numPartitions)
    +    aRdd.cache()
    +    aRdd.count()
    --- End diff --
    
    There is a very strange mistake. If we cache both `aRdd` & `tmpRdd`,  this pr and master branch all pasted the test. But if we just cache the `tmpRdd`, both the branch are failed. So here are temporarily set to cache. I will look at the details of the problem, it may be a bug, if I understand the wrong me, please pointer me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Yeah, I can test it.  You see, the `ALS` is an pratical use case. So, choose it as a test case more convincing. And I also want to see the improvement of this `pr` even after merged #17742.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    The cluster test result. The `RDD.cartesian` is used in Spark mllib ALS algorithm, and compared with the latest spark master branch. 
    
    Environments:  Spark on Yarn with 9 executors(10 cores & 30 GB Mem) on three nodes.
    Test Data: The Data: User 480,000, and Item 17,000.
    
    Test Case:
    ```
    object TestNetflixlib {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Test Netflix mlib")
        val sc = new SparkContext(conf)
    
        val data = sc.textFile("hdfs://10.1.2.173:9000/nf_training_set.txt")
    
        val ratings = data.map(_.split("::") match {
          case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
        })
    
        val rank = 0
        val numIterations = 10
        val train_start = System.nanoTime()
        val model = ALS.train(ratings, rank, numIterations, 0.01)
        val training_time = (System.nanoTime() - train_start)/ 1e9
        println(s"Training time(s): $training_time")
    
        val rec_start = System.nanoTime()
        val userRec = model.recommendProductsForUsers(20)
        println(userRec.count())
        val rec_time = (System.nanoTime() - rec_start) / 1e9
        println(s"Recommend time(s): $rec_time")
      }
    }
    ```
    
    Test Results:
    | Master Branch | Improved Branch | Percentage of ascension |
    | ------| ------ | ------ |
    | 139.934s | 162.597s | 16 % |
    | 148.138s | 157.597s | 6% |
    | 157.899s | 189.580s | 20% |
    | 135.520s | 152.486s | 13% |
    | 166.101s | 184.485s | 11 % |


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    `Broadcast` should first fetch the all block to driver, and cached in the local, then the executor fetch it from the driver. I think it's really time consuming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Hi, @jtengyp the test Results as follow:
    
    | Improved Branch | Master Branch | Percentage of ascension |
    | ------| ------ | ------ |
    | 15.877s | 2827.373s | 178x |
    | 16.781s | 2809.502s | 167x |
    | 16.320s | 2845.699s | 174x |
    | 19.437s | 2860.387s | 147x |
    | 16.793s | 2931.667s | 174x|



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17936


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    @viirya , this is slightly different from caching RDD. It is more like broadcasting, the final state is that each executor will hold the whole data of RDD2, the difference is that this is executor-executor sync, not driver-executor sync.
    
    I also have the similar concern. The performance can be varied by workloads, we'd better have some different workloads to see general improvements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Hi @viirya, can you help to review this? I thinks you are familiar with this, because you dad tried to solve it before.
    
    And also ping @srowen , @mridulm, @jerryshao.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    > ... will re-fetch the data from remote even the data has been fetched by another local node task ...
    
    This is a good point, we should improve it, but I don't think relying on block manager is a good idea:
    1. the memory used here is actually execution memory, not storage memory. (different spilling priority)
    2. block manager flushes data to disk with partition granularity, i.e. it will flush the whole partition to disk for memory outage, while it's better to have record granularity.
    
    Maybe we can use a hash map to reuse already fetched partitions in `UnsafeCartesianRDD` and see how it goes, and then apply similar optimization in `CartesianRDD`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    **[Test build #3708 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3708/testReport)** for PR 17936 at commit [`08c1849`](https://github.com/apache/spark/commit/08c1849e7fbc3edd6d4c5c60e2fab93e9cb1ee03).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Yeah, I think I can do the performance comparison.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    You are saying to use `MemoryAndDiskArray` cached data?`UnsafeCartesianRDD` also use `ExternalAppendOnlyUnsafeRowArray` to caching data. But in that implementation, we need fetch data for each task.  However, we only fetch data per executor when cache data to `BlockManager`. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    rdd1.cartesian(rdd2). For each task we need pool all the data of rdd1 (or rdd2) from the cluster. If we have n task running parallel in the same executor, that means we need duplicate poll n same data to same executor. This can reduce the gc problem and network I/O (maybe disk I/O if the memory and disk 
     array can't fit it in memory totally).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117432268
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    --- End diff --
    
    Ok, I will change it too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    @ConeyLiu , I would suggest to add a flag cartesianRDD to specify whether local cache should be enabled. User could choose to enable it or not. Besides, if cache into BlockManager is failed, can we offload to original cartesian computation, so that the task will not be failed.
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by suyanNone <gi...@git.apache.org>.
Github user suyanNone commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    So careless to notice UnsafeCartesianRDD's ExternalAppendOnlyUnsafeRowArray, that nice, I am not read all discussion here...the solution unify with unsafeCartesionRDD already have a big improvement for CartesionRDD, and it seams more simple and easy to understand... (In our inner change, we adopt a memory and disk array to store graphx Array[EdgeAttr])...  I not sure it will have a strong optimize requirement to avoid per task locality...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    I'm going to close this PR because it goes stale, please feel free to reopen it or open another PR if anyone have more thoughts on this issue.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by jtengyp <gi...@git.apache.org>.
Github user jtengyp commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    I think  you@ConeyLiu  should directly test the Cartesian phase with the following patch.
    
    val user = model.userFeatures
    val item = model.productFeatures
    val start = System.nanoTime()
    val rate = user.cartesian(item)
    println(rate.count())
    val time = (System.nanoTime() - start) / 1e9
    
    The recommendForAll in mllib ALS has been merged a new PR#17742. Your PR may not fit this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    I see. I think at least we should make this cache mechanism controllable by flag. I'm guessing in some HPC clusters or single node cluster this problem is not so severe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    I did not directly test this situation. But I have test the this pr compared with latest `ALS`(after merge #17742 ). In `ALS`, the both RDDs are cached, and also grouped the iterator(iterator.grouped). You can see the test result above, and the directly test I will give next week due to due to maintenance of server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117423372
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    +        rdd: RDD[U],
    +        partition: Partition,
    +        context: TaskContext,
    +        level: StorageLevel): Iterator[U] = {
    +      getLocalValues() match {
    +        case Some(result) =>
    +          return result
    +        case None => if (holdReadLock) {
    +          throw new SparkException(s"get() failed for block $blockId2 even though we held a lock")
    +        }
    +      }
    +
    +      val iterator = rdd.iterator(partition, context)
    +      if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) {
    +        // If the block is cached in local, wo shouldn't cache it again.
    --- End diff --
    
    Even `getStorageLevel` is not `StorageLevel.NONE`, we still can't guarantee the block can be successfully cached.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    @jerryshao Yeah, the reason I mentioned caching is to know how much re-computing RDD costs in the performance. It seems to me that if re-computing is much more costing than transferring the data, only caching can be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17936: [SPARK-20638][Core]Optimize the CartesianRDD to r...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17936#discussion_r117424223
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala ---
    @@ -71,9 +72,92 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
       }
     
       override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    +    val blockManager = SparkEnv.get.blockManager
         val currSplit = split.asInstanceOf[CartesianPartition]
    -    for (x <- rdd1.iterator(currSplit.s1, context);
    -         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
    +    val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index)
    +    var cachedInLocal = false
    +    var holdReadLock = false
    +
    +    // Try to get data from the local, otherwise it will be cached to the local.
    +    def getOrElseCache(
    +        rdd: RDD[U],
    +        partition: Partition,
    +        context: TaskContext,
    +        level: StorageLevel): Iterator[U] = {
    +      getLocalValues() match {
    +        case Some(result) =>
    +          return result
    +        case None => if (holdReadLock) {
    +          throw new SparkException(s"get() failed for block $blockId2 even though we held a lock")
    +        }
    +      }
    +
    +      val iterator = rdd.iterator(partition, context)
    +      if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) {
    +        // If the block is cached in local, wo shouldn't cache it again.
    --- End diff --
    
    @viirya Thanks for review. If the storeage level of rdd2 is not `StorageLevel.NONE`, it will cached by in the method `RDD.getOrCompute`. So I think we should cache it again, because the `blockManger.getOrElseUpdate` call the same method as `blockManager.putIterator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    How much difference this performs, compared with caching the two RDDs before doing cartesian?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17936: [SPARK-20638][Core][WIP]Optimize the CartesianRDD to red...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/17936
  
    Cool, you see the `iterator` operation can be divided in two cases: 
      1. get the block from local, this case is very good.
      2. get the block from remote.
           - The block is cached in remote. So we should get it through network. (*NetWork IO*)
           - The block need recalculate. This ways we need repeatedly recalculate. Firstly, read the block from disk, then transmit it through network, then calculate. (*Disk IO*, *NetWork IO*, *Waste of computing resources*)
    
    For your question answers:
        1. If the block is cached before, we don't cache it again.
        2. If several task need the same block, they need wait. Because we only can one write lock for the same block.
        3. In `shuffle` case. For `reduce` phase, get one block  we need to read data from the disk, network transmission and calculation.  And these times are determined by the upper loop.  And the middle result generated by `map` is present at all nodes, so this overhead is high.
    
    And also, this patch has some insufficient. The highest case is delete the cached block after the `TaskSet` finished, because the block may be used by the next task. However, there is not a api the access `DAGScheduler` or other relate (maybe I miss some). So in this patch we remove the cached  
     block only if the block is not locked. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org