You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Xiangrui Meng <me...@gmail.com> on 2015/07/28 08:38:49 UTC

Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Too many values to unpack

It seems that the error happens before ALS iterations. Could you try
`ratings.first()` right after `ratings = newrdd.map(lambda l:
Rating(int(l[1]),int(l[2]),l[4])).partitionBy(50)`? -Xiangrui

On Fri, Jun 26, 2015 at 2:28 PM, Ayman Farahat <ay...@yahoo.com> wrote:
> I tried something similar and got oration error
> I had 10 executors and 10 8 cores
>
>
>>>> ratings = newrdd.map(lambda l:
>>>> Rating(int(l[1]),int(l[2]),l[4])).partitionBy(50)
>>>> mypart = ratings.getNumPartitions()
>>>> mypart
> 50
>>>> numIterations =10
>>>> rank = 100
>>>> model = ALS.trainImplicit(ratings, rank, numIterations)
> [Stage 4:>               (0 + 59) / 210][Stage 6:>               (0 + 21) /
> 210]
> [Stage 4:===========>  (169 + 41) / 210][Stage 6:=>             (24 + 39) /
> 210]
> [Stage 6:=============================================>        (178 + 32) /
> 210]
> [Stage 7:>                                                       (0 + 80) /
> 200]15/06/26 21:25:11 ERROR TaskSetManager: Task 35 in stage 7.0 failed 4
> times; aborting job
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File
> "/homes/afarahat/aofspark/share/spark/python/pyspark/mllib/recommendation.py",
> line 200, in trainImplicit
>     model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings),
> rank,
>   File
> "/homes/afarahat/aofspark/share/spark/python/pyspark/mllib/recommendation.py",
> line 181, in _prepare
>     first = ratings.first()
>   File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line
> 1283, in first
>     rs = self.take(1)
>   File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line
> 1265, in take
>     res = self.context.runJob(self, takeUpToNumLeft, p, True)
>   File "/homes/afarahat/aofspark/share/spark/python/pyspark/context.py",
> line 897, in runJob
>     allowLocal)
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 35
> in stage 7.0 failed 4 times, most recent failure: Lost task 35.3 in stage
> 7.0 (TID 1213, gsbl407n02.blue.ygrid.yahoo.com):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File
> "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/worker.py",
> line 111, in main
>     process()
>   File
> "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/worker.py",
> line 106, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/serializers.py",
> line 133, in dump_stream
>     for obj in iterator:
>   File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line
> 1669, in add_shuffle_key
> ValueError: too many values to unpack
>
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
> at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> On Jun 26, 2015, at 12:43 PM, Ravi Mody <rm...@gmail.com> wrote:
>
> I set the number of partitions on the input dataset at 50. The number of CPU
> cores I'm using is 84 (7 executors, 12 cores).
>
> I'll look into getting a full stack trace. Any idea what my errors mean, and
> why increasing memory causes them to go away? Thanks.
>
> On Fri, Jun 26, 2015 at 11:26 AM, Xiangrui Meng <me...@gmail.com> wrote:
>>
>> Please see my comments inline. It would be helpful if you can attach
>> the full stack trace. -Xiangrui
>>
>> On Fri, Jun 26, 2015 at 7:18 AM, Ravi Mody <rm...@gmail.com> wrote:
>> > 1. These are my settings:
>> > rank = 100
>> > iterations = 12
>> > users = ~20M
>> > items = ~2M
>> > training examples = ~500M-1B (I'm running into the issue even with 500M
>> > training examples)
>> >
>>
>> Did you set number of blocks? If you didn't, could you check how many
>> partitions you have in the ratings RDD? Setting a large number of
>> blocks would increase shuffle size. If you have enough RAM, try to set
>> number of blocks to the number of CPU cores or less.
>>
>> > 2. The memory storage never seems to go too high. The user blocks may go
>> > up
>> > to ~10Gb, and each executor will have a few GB used out of 30 free GB.
>> > Everything seems small compared to the amount of memory I'm using.
>> >
>>
>> This looks correct.
>>
>> > 3. I think I have a lot of disk space - is this on the executors or the
>> > driver? Is there a way to know if the error is coming from disk space.
>> >
>>
>> You can see the shuffle data size for each iteration from the WebUI.
>> Usually, it should throw an out of disk space exception instead of the
>> message you posted. But it is worth checking.
>>
>> > 4. I'm not changing checkpointing settings, but I think checkpointing
>> > defaults to every 10 iterations? One notable thing is the crashes often
>> > start on or after the 9th iteration, so it may be related to
>> > checkpointing.
>> > But this could just be a coincidence.
>> >
>>
>> If you didn't set checkpointDir in SparkContext, the
>> checkpointInterval setting in ALS has no effect.
>>
>> > Thanks!
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat <ay...@yahoo.com>
>> > wrote:
>> >>
>> >> was there any resolution to that problem?
>> >> I am also having that with Pyspark 1.4
>> >> 380 Million observations
>> >> 100 factors and 5 iterations
>> >> Thanks
>> >> Ayman
>> >>
>> >> On Jun 23, 2015, at 6:20 PM, Xiangrui Meng <me...@gmail.com> wrote:
>> >>
>> >> > It shouldn't be hard to handle 1 billion ratings in 1.3. Just need
>> >> > more information to guess what happened:
>> >> >
>> >> > 1. Could you share the ALS settings, e.g., number of blocks, rank and
>> >> > number of iterations, as well as number of users/items in your
>> >> > dataset?
>> >> > 2. If you monitor the progress in the WebUI, how much data is stored
>> >> > in memory and how much data is shuffled per iteration?
>> >> > 3. Do you have enough disk space for the shuffle files?
>> >> > 4. Did you set checkpointDir in SparkContext and checkpointInterval
>> >> > in
>> >> > ALS?
>> >> >
>> >> > Best,
>> >> > Xiangrui
>> >> >
>> >> > On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody <rm...@gmail.com>
>> >> > wrote:
>> >> >> Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on
>> >> >> fairly
>> >> >> large datasets (1+ billion input records). As I grow my dataset I
>> >> >> often
>> >> >> run
>> >> >> into issues with a lot of failed stages and dropped executors,
>> >> >> ultimately
>> >> >> leading to the whole application failing. The errors are like
>> >> >> "org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
>> >> >> output
>> >> >> location for shuffle 19" and
>> >> >> "org.apache.spark.shuffle.FetchFailedException:
>> >> >> Failed to connect to...". These occur during flatMap, mapPartitions,
>> >> >> and
>> >> >> aggregate stages. I know that increasing memory fixes this issue,
>> >> >> but
>> >> >> most
>> >> >> of the time my executors are only using a tiny portion of the their
>> >> >> allocated memory (<10%). Often, the stages run fine until the last
>> >> >> iteration
>> >> >> or two of ALS, but this could just be a coincidence.
>> >> >>
>> >> >> I've tried tweaking a lot of settings, but it's time-consuming to do
>> >> >> this
>> >> >> through guess-and-check. Right now I have these set:
>> >> >> spark.shuffle.memoryFraction = 0.3
>> >> >> spark.storage.memoryFraction = 0.65
>> >> >> spark.executor.heartbeatInterval = 600000
>> >> >>
>> >> >> I'm sure these settings aren't optimal - any idea of what could be
>> >> >> causing
>> >> >> my errors, and what direction I can push these settings in to get
>> >> >> more
>> >> >> out
>> >> >> of my memory? I'm currently using 240 GB of memory (on 7 executors)
>> >> >> for
>> >> >> a 1
>> >> >> billion record dataset, which seems like too much. Thanks!
>> >> >
>> >> > ---------------------------------------------------------------------
>> >> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> > For additional commands, e-mail: user-help@spark.apache.org
>> >> >
>> >>
>> >
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org