You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2014/10/22 01:28:33 UTC

[jira] [Resolved] (SPARK-3517) mapPartitions is not correct clearing up the closure

     [ https://issues.apache.org/jira/browse/SPARK-3517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Rosen resolved SPARK-3517.
-------------------------------
    Resolution: Incomplete

Resolving this as "Incomplete" for now, since witgo was unable to reproduce this in his PR.  Feel free to re-open if you encounter this problem again.

> mapPartitions is not correct clearing up the closure
> ----------------------------------------------------
>
>                 Key: SPARK-3517
>                 URL: https://issues.apache.org/jira/browse/SPARK-3517
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.2, 1.1.0
>            Reporter: Guoqiang Li
>            Priority: Blocker
>
> {code}
>  for (iter <- 1 to totalIter) {
>       logInfo("Start Gibbs sampling (Iteration %d/%d)".format(iter, totalIter))
>       val broadcastModel = data.context.broadcast(topicModel)
>       val previousCorpus = corpus
>       corpus = corpus.mapPartitions { docs =>
>         val rand = new Random
>         val topicModel = broadcastModel.value
>         val topicThisTerm = BDV.zeros[Double](numTopics)
>         docs.map { doc =>
>           val content = doc.content
>           val topics = doc.topics
>           val topicsDist = doc.topicsDist
>           for (i <- 0 until content.length) {
>             val term = content(i)
>             val topic = topics(i)
>             val chosenTopic = topicModel.dropOneDistSampler(topicsDist, topicThisTerm,
>               rand, term, topic)
>             if (topic != chosenTopic) {
>               topics(i) = chosenTopic
>               topicsDist(topic) += -1
>               topicsDist(chosenTopic) += 1
>               topicModel.update(term, topic, -1)
>               topicModel.update(term, chosenTopic, 1)
>             }
>           }
>           doc
>         }
>       }.setName(s"LDA-$iter").persist(StorageLevel.MEMORY_AND_DISK)
>   }
> {code}
> The serialized corpus RDD and serialized topicModel broadcast almost as big.
> {cat spark.log | grep 'stored as values in memory'} =>
> {noformat}
> .........
> 14/09/13 00:48:44 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 68.6 KB, free 2.8 GB)
> 14/09/13 00:48:45 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 41.7 KB, free 2.8 GB)
> 14/09/13 00:49:21 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 197.5 MB, free 2.6 GB)
> 14/09/13 00:49:24 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 197.7 MB, free 2.3 GB)
> 14/09/13 00:53:25 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 163.9 MB, free 2.1 GB)
> 14/09/13 00:53:28 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 164.0 MB, free 1878.0 MB)
> 14/09/13 00:57:34 INFO MemoryStore: Block broadcast_15 stored as values in memory (estimated size 149.7 MB, free 1658.5 MB)
> 14/09/13 00:57:36 INFO MemoryStore: Block broadcast_16 stored as values in memory (estimated size 150.0 MB, free 1444.0 MB)
> 14/09/13 01:01:34 INFO MemoryStore: Block broadcast_17 stored as values in memory (estimated size 141.1 MB, free 1238.3 MB)
> 14/09/13 01:01:36 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 141.2 MB, free 1036.2 MB)
> 14/09/13 01:05:12 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 134.5 MB, free 840.7 MB)
> 14/09/13 01:05:14 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 134.7 MB, free 647.8 MB)
> 14/09/13 01:08:39 INFO MemoryStore: Block broadcast_21 stored as values in memory (estimated size 218.3 KB, free 589.5 MB)
> 14/09/13 01:08:39 INFO MemoryStore: Block broadcast_22 stored as values in memory (estimated size 218.3 KB, free 589.2 MB)
> 14/09/13 01:08:40 INFO MemoryStore: Block broadcast_23 stored as values in memory (estimated size 134.6 MB, free 454.6 MB)
> 14/09/13 01:08:53 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 129.3 MB, free 267.1 MB)
> 14/09/13 01:08:55 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 129.4 MB, free 82.0 MB)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org