You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Daniel Darabos (JIRA)" <ji...@apache.org> on 2016/01/05 17:18:39 UTC

[jira] [Commented] (SPARK-11293) Spillable collections leak shuffle memory

    [ https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083311#comment-15083311 ] 

Daniel Darabos commented on SPARK-11293:
----------------------------------------

I have a somewhat contrived example that still leaks in 1.6.0. I started {{spark-shell --master 'local-cluster[2,2,1024]'}} and ran:

{code}
sc.parallelize(0 to 10000000, 2).map(x => x % 10000 -> x).groupByKey.asInstanceOf[org.apache.spark.rdd.ShuffledRDD[Int, Int, Iterable[Int]]].setKeyOrdering(implicitly[Ordering[Int]]).mapPartitions { it => it.take(1) }.collect
{code}

I've added extra logging around task memory acquisition so I would be able to see what is not released. These are the logs:

{code}
16/01/05 17:02:45 INFO Executor: Running task 0.0 in stage 13.0 (TID 24)
16/01/05 17:02:45 INFO MapOutputTrackerWorker: Updating epoch to 7 and clearing cache
16/01/05 17:02:45 INFO TorrentBroadcast: Started reading broadcast variable 13
16/01/05 17:02:45 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 2.3 KB, free 7.6 KB)
16/01/05 17:02:45 INFO TorrentBroadcast: Reading broadcast variable 13 took 6 ms
16/01/05 17:02:45 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 4.5 KB, free 12.1 KB)
16/01/05 17:02:45 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 6, fetching them
16/01/05 17:02:45 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@192.168.0.32:55147)
16/01/05 17:02:45 INFO MapOutputTrackerWorker: Got the output locations
16/01/05 17:02:45 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
16/01/05 17:02:45 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1 ms
16/01/05 17:02:45 ERROR TaskMemoryManager: Task 24 acquire 5.0 MB for null
16/01/05 17:02:45 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187)
	at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158)
	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:47 ERROR TaskMemoryManager: Task 24 acquire 15.0 MB for null
16/01/05 17:02:47 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187)
	at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158)
	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 acquire 5.0 MB for null
16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187)
	at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82)
	at org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:89)
	at org.apache.spark.util.collection.ExternalSorter.maybeSpillCollection(ExternalSorter.scala:220)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:201)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 acquire 10.5 MB for null
16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:187)
	at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:82)
	at org.apache.spark.util.collection.ExternalSorter.maybeSpill(ExternalSorter.scala:89)
	at org.apache.spark.util.collection.ExternalSorter.maybeSpillCollection(ExternalSorter.scala:220)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:201)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:49 ERROR TaskMemoryManager: Task 24 release 20.0 MB from null
16/01/05 17:02:49 ERROR TaskMemoryManager: Stack trace:
java.lang.Exception: here
	at org.apache.spark.memory.TaskMemoryManager.releaseExecutionMemory(TaskMemoryManager.java:197)
	at org.apache.spark.util.collection.Spillable$class.releaseMemory(Spillable.scala:111)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.releaseMemory(ExternalAppendOnlyMap.scala:55)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.org$apache$spark$util$collection$ExternalAppendOnlyMap$$freeCurrentMap(ExternalAppendOnlyMap.scala:259)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$iterator$1.apply$mcV$sp(ExternalAppendOnlyMap.scala:251)
	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
16/01/05 17:02:49 ERROR Executor: Managed memory leak detected; size = 16259594 bytes, TID = 24
{code}

The issue is that {{ExternalSorter.stop()}} is only called by {{CompletionIterator}} if the iterator is iterated through to the end. But here we only take the first element.

In practice this happens to us in a {{zipPartitions}} call where we do not iterate both iterators to the end. (It's a kind of join.)

Is it illegal to not iterate an RDD iterator to the end? I think it's not. {{RDD.take}} stops short as well. This issue can probably be reproduced with {{RDD.take}} too. (I tried and failed.)

> Spillable collections leak shuffle memory
> -----------------------------------------
>
>                 Key: SPARK-11293
>                 URL: https://issues.apache.org/jira/browse/SPARK-11293
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.1, 1.4.1, 1.5.1
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Critical
>             Fix For: 1.6.0
>
>
> I discovered multiple leaks of shuffle memory while working on my memory manager consolidation patch, which added the ability to do strict memory leak detection for the bookkeeping that used to be performed by the ShuffleMemoryManager. This uncovered a handful of places where tasks can acquire execution/shuffle memory but never release it, starving themselves of memory.
> Problems that I found:
> * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution memory.
> * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a {{CompletionIterator}}.
> * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing its resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org