You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dong Wang (Jira)" <ji...@apache.org> on 2019/11/11 12:24:00 UTC

[jira] [Created] (SPARK-29844) Wrong unpersist strategy in ml.recommendation.ASL.train

Dong Wang created SPARK-29844:
---------------------------------

             Summary: Wrong unpersist strategy in ml.recommendation.ASL.train
                 Key: SPARK-29844
                 URL: https://issues.apache.org/jira/browse/SPARK-29844
             Project: Spark
          Issue Type: Improvement
          Components: ML
    Affects Versions: 3.0.0
            Reporter: Dong Wang


In ml.recommendation.ASL.train(), there are many intermediate RDDs. At the end of the method, these RDDs invoke unpersist(), but the timings of unpersist is not right, which will cause recomputation and memory waste.
{code:scala}
    val userIdAndFactors = userInBlocks
      .mapValues(_.srcIds)
      .join(userFactors)
      .mapPartitions({ items =>
        items.flatMap { case (_, (ids, factors)) =>
          ids.view.zip(factors)
        }
      // Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks
      // and userFactors.
      }, preservesPartitioning = true)
      .setName("userFactors")
      .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
    val itemIdAndFactors = itemInBlocks
      .mapValues(_.srcIds)
      .join(itemFactors)
      .mapPartitions({ items =>
        items.flatMap { case (_, (ids, factors)) =>
          ids.view.zip(factors)
        }
      }, preservesPartitioning = true)
      .setName("itemFactors")
      .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
    if (finalRDDStorageLevel != StorageLevel.NONE) {
      userIdAndFactors.count()
      itemFactors.unpersist() // Premature unpersist
      itemIdAndFactors.count()
      userInBlocks.unpersist() // Lagging unpersist
      userOutBlocks.unpersist() // Lagging unpersist
      itemInBlocks.unpersist() 
      itemOutBlocks.unpersist() // Lagging unpersist
      blockRatings.unpersist() // Lagging unpersist
    }
    (userIdAndFactors, itemIdAndFactors)
  }
{code}

1. Unpersist itemFactors too early. itemIdAndFactors.count() will use itemFactors. So itemFactors will be recomputed.
2. Unpersist userInBlocks, userOutBlocks, itemOutBlocks, and blockRatings too late. The final action - itemIdAndFactors.count() will not use these RDDs, so these RDDs can be unpersisted before it to save memory.
By the way, itemIdAndFactors is persisted here but will never be unpersisted util the application ends. It may hurts the performance, but I think it's hard to fix.

This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org