You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ANDREA SPINA <74...@studenti.unimore.it> on 2016/09/01 09:49:38 UTC

java.lang.OutOfMemoryError Spark MLlib ALS matrix factorization

Hello everyone.
I'm running the Apache Spark MLlib ALS matrix factorization and I ran into
the following exceptions:

*The following exception is periodic, it starts on the first iteration with
the OOM error and then a long line of FNF exceptions during stage
resubmittings (according with the UI, **stage 12.0 is the first Iteration
stage*
*). After 4 retries, the Job indeed failes and get aborted*
16/08/31 23:53:03 WARN TaskSetManager: Lost task 12.0 in stage 2.0 (TID
3312, cloud-15): java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.Integer.valueOf(Integer.java:832)
        at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:70)
        at scala.runtime.ScalaRunTime$.array_apply(ScalaRunTime.scala:73)
        at
org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.copyElement(ALS.scala:1059)
        at
org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.copyElement(ALS.scala:1000)
        at
org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:735)
        at
org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
        at
org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
        at
org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
        at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
        at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
        at
org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.org$apache$spark$ml$recommendation$ALS$UncompressedInBlock$$sort(ALS.scala:971)
        at
org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.compress(ALS.scala:929)
        at
org.apache.spark.ml.recommendation.ALS$$anonfun$15.apply(ALS.scala:1114)
        at
org.apache.spark.ml.recommendation.ALS$$anonfun$15.apply(ALS.scala:1108)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
        at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

org.apache.spark.shuffle.FetchFailedException: Error in opening
FileSegmentManagedBuffer{file=/data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/b
lockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data,
offset=5194506, length=48945}
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:307)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
        at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
        at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error in opening
FileSegmentManagedBuffer{file=/data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/blockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data,
offset=5194506, length=48945}
        at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:113)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:304)
        ... 26 more
Caused by: java.io.FileNotFoundException:
/data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/blockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data
(No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:98)
        ... 27 more

16/09/01 00:54:51 WARN TaskSetManager: Lost task 304.0 in stage 12.0 (TID
9286, cloud-22): java.lang.OutOfMemoryError: Java heap space
        at
java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2347)
        at
java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2276)
        at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1323)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
        at
org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147)
        at
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185)
        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:206)
        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:55)
        at
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:93)
        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55)
        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158)
        at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
        at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)

16/09/01 00:55:20 INFO TaskSetManager: Starting task 375.0 in stage 12.0
(TID 9360, cloud-15, partition 375,PROCESS_LOCAL, 2268 bytes)
16/09/01 00:55:20 INFO TaskSetManager: Finished task 91.0 in stage 12.0
(TID 9093) in 1356978 ms on cloud-15 (120/720)
16/09/01 00:56:07 INFO TaskSetManager: Starting task 384.0 in stage 12.0
(TID 9361, cloud-24, partition 384,PROCESS_LOCAL, 2268 bytes)
16/09/01 00:56:07 WARN TaskSetManager: Lost task 18.0 in stage 12.0 (TID
9027, cloud-24): FetchFailed(BlockManagerId(2, cloud-22, 40528),
shuffleId=22, mapId=671, reduceId=18, message=
org.apache.spark.shuffle.FetchFailedException:
java.io.FileNotFoundException:
/data/1/peel/spark/tmp/spark-6225354a-22f0-45dd-aff0-76051ad609ed/executor-d5fbc621-341c-4fc9-bedc-c292dc7f038a/blockmgr-c8b40f38-99a9-4060-823d-50b502bd9f91/25/shuffle_22_671_0.index
(No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191)
        at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:298)
        at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
        at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

I'm running with* spark-1.6.2*. I really can't figure out the reason behind
that.

My code simply calls the library as follows:

val als = new ALS()
    .setIntermediateRDDStorageLevel(storageLevel)
    .setBlocks(numTasks)
    .setLambda(0.1)
    .setRank(50)
    .setIterations(10)
    .setSeed(42)

val model = als.run(ratings)

model.save(sc, outputPath)

sc.stop()

where
- *ratings* as the input RDD (parallelized with *numTasks* partitions)
contains (uid, iid, rate) rows about 8e6 users, 1e6 items and about (5,6)e9
ratings (700/user avg)
- *numTasks*: currently is 240 * 3 (= numOfCores * 3)
- *storageLevel*: MEMORY_AND_DISK

I did several tries as follows:
- get lower the number of blocks: 1 - numTasks, 2 - 240(numOfCores), 3 -
let it setted by the MLlib implementation
- change the storage level to MEMORY_ONLY

I'd try to varying the spark.shuffle.memoryFraction as well, but I read is
deprecated since 1.6 spark version.

I'm running with a 15 nodes cluster - 16cpus per node, 32GB memory per node
- with the following valuable properties:

spark.executor.memory = 28672m
spark.driver.memory = 28672m
spark.deamon.memory = 28672m
spark.driver.maxResultSize = 0
spark.network.timeout = 3000s

Any help will be appreciated. Thank you.

-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)