You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/11/16 20:56:00 UTC

[jira] [Resolved] (SPARK-28781) Unneccesary persist in PeriodicCheckpointer.update()

     [ https://issues.apache.org/jira/browse/SPARK-28781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen resolved SPARK-28781.
----------------------------------
    Resolution: Not A Problem

I think the point of this class is to manage RDDs that depend on each other, to break lineage, etc. They all need to be persisted, so they are not recomputed when child RDDs are materialized. My only question here is why it needs to hold 3 rather than 2, but, that's a different issue. 

> Unneccesary persist in PeriodicCheckpointer.update()
> ----------------------------------------------------
>
>                 Key: SPARK-28781
>                 URL: https://issues.apache.org/jira/browse/SPARK-28781
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Dong Wang
>            Priority: Major
>
> Once the fuction _update()_ is called, the RDD _newData_ is persisted at line 82. However, only when meeting the checking point condition (at line 94), the persisted rdd _newData_ would be used for the second time in the API _checkpoint()_ (do checkpoint at line 97). In other conditions, _newData_ will only be used once and it is unnecessary to persist the rdd in that case. Although the persistedQueue will be checked to avoid too many unnecessary cached data, it would be better to avoid every unnecessary persist operation.
> {code:scala}
> def update(newData: T): Unit = {
>     persist(newData)
>     persistedQueue.enqueue(newData)
>     // We try to maintain 2 Datasets in persistedQueue to support the semantics of this class:
>     // Users should call [[update()]] when a new Dataset has been created,
>     // before the Dataset has been materialized.
>     while (persistedQueue.size > 3) {
>       val dataToUnpersist = persistedQueue.dequeue()
>       unpersist(dataToUnpersist)
>     }
>     updateCount += 1
>     // Handle checkpointing (after persisting)
>     if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>       && sc.getCheckpointDir.nonEmpty) {
>       // Add new checkpoint before removing old checkpoints.
>       checkpoint(newData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org