You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ANDREA SPINA <74...@studenti.unimore.it> on 2016/09/01 09:49:38 UTC
java.lang.OutOfMemoryError Spark MLlib ALS matrix factorization
Hello everyone.
I'm running the Apache Spark MLlib ALS matrix factorization and I ran into
the following exceptions:
*The following exception is periodic, it starts on the first iteration with
the OOM error and then a long line of FNF exceptions during stage
resubmittings (according with the UI, **stage 12.0 is the first Iteration
stage*
*). After 4 retries, the Job indeed failes and get aborted*
16/08/31 23:53:03 WARN TaskSetManager: Lost task 12.0 in stage 2.0 (TID
3312, cloud-15): java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Integer.valueOf(Integer.java:832)
at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:70)
at scala.runtime.ScalaRunTime$.array_apply(ScalaRunTime.scala:73)
at
org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.copyElement(ALS.scala:1059)
at
org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.copyElement(ALS.scala:1000)
at
org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:735)
at
org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525)
at
org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453)
at
org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at
org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.org$apache$spark$ml$recommendation$ALS$UncompressedInBlock$$sort(ALS.scala:971)
at
org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.compress(ALS.scala:929)
at
org.apache.spark.ml.recommendation.ALS$$anonfun$15.apply(ALS.scala:1114)
at
org.apache.spark.ml.recommendation.ALS$$anonfun$15.apply(ALS.scala:1108)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
org.apache.spark.shuffle.FetchFailedException: Error in opening
FileSegmentManagedBuffer{file=/data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/b
lockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data,
offset=5194506, length=48945}
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:307)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error in opening
FileSegmentManagedBuffer{file=/data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/blockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data,
offset=5194506, length=48945}
at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:113)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:304)
... 26 more
Caused by: java.io.FileNotFoundException:
/data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/blockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at
org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:98)
... 27 more
16/09/01 00:54:51 WARN TaskSetManager: Lost task 304.0 in stage 12.0 (TID
9286, cloud-22): java.lang.OutOfMemoryError: Java heap space
at
java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2347)
at
java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2276)
at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1323)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at
org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:206)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:55)
at
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:93)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158)
at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
16/09/01 00:55:20 INFO TaskSetManager: Starting task 375.0 in stage 12.0
(TID 9360, cloud-15, partition 375,PROCESS_LOCAL, 2268 bytes)
16/09/01 00:55:20 INFO TaskSetManager: Finished task 91.0 in stage 12.0
(TID 9093) in 1356978 ms on cloud-15 (120/720)
16/09/01 00:56:07 INFO TaskSetManager: Starting task 384.0 in stage 12.0
(TID 9361, cloud-24, partition 384,PROCESS_LOCAL, 2268 bytes)
16/09/01 00:56:07 WARN TaskSetManager: Lost task 18.0 in stage 12.0 (TID
9027, cloud-24): FetchFailed(BlockManagerId(2, cloud-22, 40528),
shuffleId=22, mapId=671, reduceId=18, message=
org.apache.spark.shuffle.FetchFailedException:
java.io.FileNotFoundException:
/data/1/peel/spark/tmp/spark-6225354a-22f0-45dd-aff0-76051ad609ed/executor-d5fbc621-341c-4fc9-bedc-c292dc7f038a/blockmgr-c8b40f38-99a9-4060-823d-50b502bd9f91/25/shuffle_22_671_0.index
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:298)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
I'm running with* spark-1.6.2*. I really can't figure out the reason behind
that.
My code simply calls the library as follows:
val als = new ALS()
.setIntermediateRDDStorageLevel(storageLevel)
.setBlocks(numTasks)
.setLambda(0.1)
.setRank(50)
.setIterations(10)
.setSeed(42)
val model = als.run(ratings)
model.save(sc, outputPath)
sc.stop()
where
- *ratings* as the input RDD (parallelized with *numTasks* partitions)
contains (uid, iid, rate) rows about 8e6 users, 1e6 items and about (5,6)e9
ratings (700/user avg)
- *numTasks*: currently is 240 * 3 (= numOfCores * 3)
- *storageLevel*: MEMORY_AND_DISK
I did several tries as follows:
- get lower the number of blocks: 1 - numTasks, 2 - 240(numOfCores), 3 -
let it setted by the MLlib implementation
- change the storage level to MEMORY_ONLY
I'd try to varying the spark.shuffle.memoryFraction as well, but I read is
deprecated since 1.6 spark version.
I'm running with a 15 nodes cluster - 16cpus per node, 32GB memory per node
- with the following valuable properties:
spark.executor.memory = 28672m
spark.driver.memory = 28672m
spark.deamon.memory = 28672m
spark.driver.maxResultSize = 0
spark.network.timeout = 3000s
Any help will be appreciated. Thank you.
--
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)