You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Oleg Ruchovets <or...@gmail.com> on 2014/09/17 13:38:54 UTC

pyspark on yarn - lost executor

Hi ,
  I am execution pyspark on yarn.
I have successfully executed initial dataset but now I growed it 10 times
more.

during execution I got all the time this error:
  14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor
68 on UCS-NODE1.sms1.local: remote Akka client disassociated

 tasks are failed a resubmitted again:

14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29,
32, 33, 48, 75, 86, 91, 93, 94
14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93
14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27,
39, 51, 64
14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42,
61, 67, 77, 81, 91
14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29,
34, 40, 46, 67, 69, 86
14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17,
18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89
14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79,
92
14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24,
31, 43, 65, 73
14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at
PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72,
75, 84



*QUESTION:*
   how to debug / tune the problem.
What can cause to such behavior?
I have 5 machine cluster with 32 GB ram.
 Dataset - 3G.

command for execution:


 /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
--master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
--py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
 /input/tad/inpuut.csv  /output/cad_model_500_2


Where can I find description of the parameters?
--num-executors 12
--driver-memory 4g
--executor-memory 2g

What parameters should be used for tuning?

Thanks
Oleg.

Re: pyspark on yarn - lost executor

Posted by Sandy Ryza <sa...@cloudera.com>.
Hi Oleg,

Those parameters control the number and size of Spark's daemons on the
cluster.  If you're interested in how these daemons relate to each other
and interact with YARN, I wrote a post on this a little while ago -
http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

In general, typing "spark-submit --help" will list the available options
and what they control.

To fetch the executor logs for an application, you can use "yarn logs
-applicationId <the application ID>".

-Sandy

On Thu, Sep 18, 2014 at 5:47 AM, Oleg Ruchovets <or...@gmail.com>
wrote:

> Great.
>   Upgrade helped.
>
> Still need some inputs:
> 1) Is there any log files of spark job execution?
> 2) Where can I read about tuning / parameter configuration:
>
> For example:
> --num-executors 12
> --driver-memory 4g
> --executor-memory 2g
>
> what is the meaning of thous parameters?
>
> Thanks
> Oleg.
>
> On Thu, Sep 18, 2014 at 12:15 AM, Davies Liu <da...@databricks.com>
> wrote:
>
>> Maybe the Python worker use too much memory during groupByKey(),
>> groupByKey() with larger numPartitions can help.
>>
>> Also, can you upgrade your cluster to 1.1? It can spilling the data
>> into disks if the memory can not hold all the data during groupByKey().
>>
>> Also, If there is hot key with dozens of millions of values, the PR [1]
>> can help it, it actually helped someone with large datasets (3T).
>>
>> Davies
>>
>> [1] https://github.com/apache/spark/pull/1977
>>
>> On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets <or...@gmail.com>
>> wrote:
>> >
>> > Sure, I'll post to the mail list.
>> >
>> > groupByKey(self, numPartitions=None)
>> >
>> > source code
>> >
>> > Group the values for each key in the RDD into a single sequence.
>> Hash-partitions the resulting RDD with into numPartitions partitions.
>> >
>> >
>> > So instead of using default I'll provide numPartitions , but what is
>> the best practice to calculate the number of partitions? and how number of
>> partitions related to my original problem?
>> >
>> >
>> > Thanks
>> >
>> > Oleg.
>> >
>> >
>> > http://spark.apache.org/docs/1.0.2/api/python/frames.html
>> >
>> >
>> >
>> > On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman <
>> eric.d.friedman@gmail.com> wrote:
>> >>
>> >> Look at the API for text file and groupByKey. Please don't take
>> threads off list. Other people have the same questions.
>> >>
>> >> ----
>> >> Eric Friedman
>> >>
>> >> On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets <or...@gmail.com>
>> wrote:
>> >>
>> >> Can hou please explain how to configure partitions?
>> >> Thanks
>> >> Oleg
>> >>
>> >> On Wednesday, September 17, 2014, Eric Friedman <
>> eric.d.friedman@gmail.com> wrote:
>> >>>
>> >>> Yeah, you need to increase partitions. You only have one on your text
>> file. On groupByKey you're getting the pyspark default, which is too low.
>> >>>
>> >>> ----
>> >>> Eric Friedman
>> >>>
>> >>> On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets <or...@gmail.com>
>> wrote:
>> >>>
>> >>> This is very good question :-).
>> >>>
>> >>> Here is my code:
>> >>>
>> >>> sc = SparkContext(appName="CAD")
>> >>>     lines = sc.textFile(sys.argv[1], 1)
>> >>>     result = lines.map(doSplit).groupByKey().mapValues(lambda vc:
>> my_custom_function(vc))
>> >>>     result.saveAsTextFile(sys.argv[2])
>> >>>
>> >>> Should I configure partitioning manually ? Where should I configure
>> it? Where can I read about partitioning best practices?
>> >>>
>> >>> Thanks
>> >>> Oleg.
>> >>>
>> >>> On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman <
>> eric.d.friedman@gmail.com> wrote:
>> >>>>
>> >>>> How many partitions do you have in your input rdd?  Are you
>> specifying numPartitions in subsequent calls to groupByKey/reduceByKey?
>> >>>>
>> >>>> On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets <or...@gmail.com>
>> wrote:
>> >>>>
>> >>>> Hi ,
>> >>>>   I am execution pyspark on yarn.
>> >>>> I have successfully executed initial dataset but now I growed it 10
>> times more.
>> >>>>
>> >>>> during execution I got all the time this error:
>> >>>>   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost
>> executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated
>> >>>>
>> >>>>  tasks are failed a resubmitted again:
>> >>>>
>> >>>> 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23,
>> 26, 29, 32, 33, 48, 75, 86, 91, 93, 94
>> >>>> 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52,
>> 60, 93
>> >>>> 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20,
>> 23, 27, 39, 51, 64
>> >>>> 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68,
>> 80
>> >>>> 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20,
>> 34, 42, 61, 67, 77, 81, 91
>> >>>> 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21,
>> 23, 29, 34, 40, 46, 67, 69, 86
>> >>>> 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13,
>> 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81,
>> 85, 89
>> >>>> 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43,
>> 59, 79, 92
>> >>>> 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3,
>> 11, 24, 31, 43, 65, 73
>> >>>> 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1
>> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7,
>> 41, 72, 75, 84
>> >>>>
>> >>>>
>> >>>>
>> >>>> QUESTION:
>> >>>>    how to debug / tune the problem.
>> >>>> What can cause to such behavior?
>> >>>> I have 5 machine cluster with 32 GB ram.
>> >>>>  Dataset - 3G.
>> >>>>
>> >>>> command for execution:
>> >>>>
>> >>>>
>> /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
>> --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
>> --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
>> /input/tad/inpuut.csv  /output/cad_model_500_2
>> >>>>
>> >>>>
>> >>>> Where can I find description of the parameters?
>> >>>> --num-executors 12
>> >>>> --driver-memory 4g
>> >>>> --executor-memory 2g
>> >>>>
>> >>>> What parameters should be used for tuning?
>> >>>>
>> >>>> Thanks
>> >>>> Oleg.
>> >>>>
>> >>>>
>> >>>>
>> >>>
>> >
>>
>
>

Re: pyspark on yarn - lost executor

Posted by Oleg Ruchovets <or...@gmail.com>.
Great.
  Upgrade helped.

Still need some inputs:
1) Is there any log files of spark job execution?
2) Where can I read about tuning / parameter configuration:

For example:
--num-executors 12
--driver-memory 4g
--executor-memory 2g

what is the meaning of thous parameters?

Thanks
Oleg.

On Thu, Sep 18, 2014 at 12:15 AM, Davies Liu <da...@databricks.com> wrote:

> Maybe the Python worker use too much memory during groupByKey(),
> groupByKey() with larger numPartitions can help.
>
> Also, can you upgrade your cluster to 1.1? It can spilling the data
> into disks if the memory can not hold all the data during groupByKey().
>
> Also, If there is hot key with dozens of millions of values, the PR [1]
> can help it, it actually helped someone with large datasets (3T).
>
> Davies
>
> [1] https://github.com/apache/spark/pull/1977
>
> On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets <or...@gmail.com>
> wrote:
> >
> > Sure, I'll post to the mail list.
> >
> > groupByKey(self, numPartitions=None)
> >
> > source code
> >
> > Group the values for each key in the RDD into a single sequence.
> Hash-partitions the resulting RDD with into numPartitions partitions.
> >
> >
> > So instead of using default I'll provide numPartitions , but what is the
> best practice to calculate the number of partitions? and how number of
> partitions related to my original problem?
> >
> >
> > Thanks
> >
> > Oleg.
> >
> >
> > http://spark.apache.org/docs/1.0.2/api/python/frames.html
> >
> >
> >
> > On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman <
> eric.d.friedman@gmail.com> wrote:
> >>
> >> Look at the API for text file and groupByKey. Please don't take threads
> off list. Other people have the same questions.
> >>
> >> ----
> >> Eric Friedman
> >>
> >> On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets <or...@gmail.com>
> wrote:
> >>
> >> Can hou please explain how to configure partitions?
> >> Thanks
> >> Oleg
> >>
> >> On Wednesday, September 17, 2014, Eric Friedman <
> eric.d.friedman@gmail.com> wrote:
> >>>
> >>> Yeah, you need to increase partitions. You only have one on your text
> file. On groupByKey you're getting the pyspark default, which is too low.
> >>>
> >>> ----
> >>> Eric Friedman
> >>>
> >>> On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets <or...@gmail.com>
> wrote:
> >>>
> >>> This is very good question :-).
> >>>
> >>> Here is my code:
> >>>
> >>> sc = SparkContext(appName="CAD")
> >>>     lines = sc.textFile(sys.argv[1], 1)
> >>>     result = lines.map(doSplit).groupByKey().mapValues(lambda vc:
> my_custom_function(vc))
> >>>     result.saveAsTextFile(sys.argv[2])
> >>>
> >>> Should I configure partitioning manually ? Where should I configure
> it? Where can I read about partitioning best practices?
> >>>
> >>> Thanks
> >>> Oleg.
> >>>
> >>> On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman <
> eric.d.friedman@gmail.com> wrote:
> >>>>
> >>>> How many partitions do you have in your input rdd?  Are you
> specifying numPartitions in subsequent calls to groupByKey/reduceByKey?
> >>>>
> >>>> On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets <or...@gmail.com>
> wrote:
> >>>>
> >>>> Hi ,
> >>>>   I am execution pyspark on yarn.
> >>>> I have successfully executed initial dataset but now I growed it 10
> times more.
> >>>>
> >>>> during execution I got all the time this error:
> >>>>   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost
> executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated
> >>>>
> >>>>  tasks are failed a resubmitted again:
> >>>>
> >>>> 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23,
> 26, 29, 32, 33, 48, 75, 86, 91, 93, 94
> >>>> 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52,
> 60, 93
> >>>> 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20,
> 23, 27, 39, 51, 64
> >>>> 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68,
> 80
> >>>> 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20,
> 34, 42, 61, 67, 77, 81, 91
> >>>> 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21,
> 23, 29, 34, 40, 46, 67, 69, 86
> >>>> 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13,
> 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81,
> 85, 89
> >>>> 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43,
> 59, 79, 92
> >>>> 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3,
> 11, 24, 31, 43, 65, 73
> >>>> 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1
> (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7,
> 41, 72, 75, 84
> >>>>
> >>>>
> >>>>
> >>>> QUESTION:
> >>>>    how to debug / tune the problem.
> >>>> What can cause to such behavior?
> >>>> I have 5 machine cluster with 32 GB ram.
> >>>>  Dataset - 3G.
> >>>>
> >>>> command for execution:
> >>>>
> >>>>
> /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
> --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
> --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
> /input/tad/inpuut.csv  /output/cad_model_500_2
> >>>>
> >>>>
> >>>> Where can I find description of the parameters?
> >>>> --num-executors 12
> >>>> --driver-memory 4g
> >>>> --executor-memory 2g
> >>>>
> >>>> What parameters should be used for tuning?
> >>>>
> >>>> Thanks
> >>>> Oleg.
> >>>>
> >>>>
> >>>>
> >>>
> >
>

Re: pyspark on yarn - lost executor

Posted by Davies Liu <da...@databricks.com>.
Maybe the Python worker use too much memory during groupByKey(),
groupByKey() with larger numPartitions can help.

Also, can you upgrade your cluster to 1.1? It can spilling the data
into disks if the memory can not hold all the data during groupByKey().

Also, If there is hot key with dozens of millions of values, the PR [1]
can help it, it actually helped someone with large datasets (3T).

Davies

[1] https://github.com/apache/spark/pull/1977

On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets <or...@gmail.com> wrote:
>
> Sure, I'll post to the mail list.
>
> groupByKey(self, numPartitions=None)
>
> source code
>
> Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions.
>
>
> So instead of using default I'll provide numPartitions , but what is the best practice to calculate the number of partitions? and how number of partitions related to my original problem?
>
>
> Thanks
>
> Oleg.
>
>
> http://spark.apache.org/docs/1.0.2/api/python/frames.html
>
>
>
> On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman <er...@gmail.com> wrote:
>>
>> Look at the API for text file and groupByKey. Please don't take threads off list. Other people have the same questions.
>>
>> ----
>> Eric Friedman
>>
>> On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets <or...@gmail.com> wrote:
>>
>> Can hou please explain how to configure partitions?
>> Thanks
>> Oleg
>>
>> On Wednesday, September 17, 2014, Eric Friedman <er...@gmail.com> wrote:
>>>
>>> Yeah, you need to increase partitions. You only have one on your text file. On groupByKey you're getting the pyspark default, which is too low.
>>>
>>> ----
>>> Eric Friedman
>>>
>>> On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets <or...@gmail.com> wrote:
>>>
>>> This is very good question :-).
>>>
>>> Here is my code:
>>>
>>> sc = SparkContext(appName="CAD")
>>>     lines = sc.textFile(sys.argv[1], 1)
>>>     result = lines.map(doSplit).groupByKey().mapValues(lambda vc: my_custom_function(vc))
>>>     result.saveAsTextFile(sys.argv[2])
>>>
>>> Should I configure partitioning manually ? Where should I configure it? Where can I read about partitioning best practices?
>>>
>>> Thanks
>>> Oleg.
>>>
>>> On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman <er...@gmail.com> wrote:
>>>>
>>>> How many partitions do you have in your input rdd?  Are you specifying numPartitions in subsequent calls to groupByKey/reduceByKey?
>>>>
>>>> On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets <or...@gmail.com> wrote:
>>>>
>>>> Hi ,
>>>>   I am execution pyspark on yarn.
>>>> I have successfully executed initial dataset but now I growed it 10 times more.
>>>>
>>>> during execution I got all the time this error:
>>>>   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated
>>>>
>>>>  tasks are failed a resubmitted again:
>>>>
>>>> 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94
>>>> 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93
>>>> 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64
>>>> 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
>>>> 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91
>>>> 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86
>>>> 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89
>>>> 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92
>>>> 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73
>>>> 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84
>>>>
>>>>
>>>>
>>>> QUESTION:
>>>>    how to debug / tune the problem.
>>>> What can cause to such behavior?
>>>> I have 5 machine cluster with 32 GB ram.
>>>>  Dataset - 3G.
>>>>
>>>> command for execution:
>>>>
>>>>      /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py  /input/tad/inpuut.csv  /output/cad_model_500_2
>>>>
>>>>
>>>> Where can I find description of the parameters?
>>>> --num-executors 12
>>>> --driver-memory 4g
>>>> --executor-memory 2g
>>>>
>>>> What parameters should be used for tuning?
>>>>
>>>> Thanks
>>>> Oleg.
>>>>
>>>>
>>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: pyspark on yarn - lost executor

Posted by Oleg Ruchovets <or...@gmail.com>.
Sure, I'll post to the mail list.
groupByKey(self, numPartitions=None)source code
<http://spark.apache.org/docs/1.0.2/api/python/pyspark.rdd-pysrc.html#RDD.groupByKey>


Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with into numPartitions partitions.


So instead of using default I'll provide numPartitions , but what is the
best practice to calculate the number of partitions? and how number of
partitions related to my original problem?


Thanks

Oleg.

http://spark.apache.org/docs/1.0.2/api/python/frames.html



On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman <er...@gmail.com>
wrote:

> Look at the API for text file and groupByKey. Please don't take threads
> off list. Other people have the same questions.
>
> ----
> Eric Friedman
>
> On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets <or...@gmail.com> wrote:
>
> Can hou please explain how to configure partitions?
> Thanks
> Oleg
>
> On Wednesday, September 17, 2014, Eric Friedman <er...@gmail.com>
> wrote:
>
>> Yeah, you need to increase partitions. You only have one on your text
>> file. On groupByKey you're getting the pyspark default, which is too low.
>>
>> ----
>> Eric Friedman
>>
>> On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets <or...@gmail.com> wrote:
>>
>> This is very good question :-).
>>
>> Here is my code:
>>
>> sc = SparkContext(appName="CAD")
>>     lines = sc.textFile(sys.argv[1], 1)
>>     result = lines.map(doSplit).groupByKey().mapValues(lambda vc:
>> my_custom_function(vc))
>>     result.saveAsTextFile(sys.argv[2])
>>
>> Should I configure partitioning manually ? Where should I configure it?
>> Where can I read about partitioning best practices?
>>
>> Thanks
>> Oleg.
>>
>> On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman <eric.d.friedman@gmail.com
>> > wrote:
>>
>>> How many partitions do you have in your input rdd?  Are you specifying
>>> numPartitions in subsequent calls to groupByKey/reduceByKey?
>>>
>>> On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets <or...@gmail.com>
>>> wrote:
>>>
>>> Hi ,
>>>   I am execution pyspark on yarn.
>>> I have successfully executed initial dataset but now I growed it 10
>>> times more.
>>>
>>> during execution I got all the time this error:
>>>   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost
>>> executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated
>>>
>>>  tasks are failed a resubmitted again:
>>>
>>> 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26,
>>> 29, 32, 33, 48, 75, 86, 91, 93, 94
>>> 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93
>>> 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23,
>>> 27, 39, 51, 64
>>> 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
>>> 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34,
>>> 42, 61, 67, 77, 81, 91
>>> 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23,
>>> 29, 34, 40, 46, 67, 69, 86
>>> 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15,
>>> 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85,
>>> 89
>>> 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59,
>>> 79, 92
>>> 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11,
>>> 24, 31, 43, 65, 73
>>> 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD
>>> at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72,
>>> 75, 84
>>>
>>>
>>>
>>> *QUESTION:*
>>>    how to debug / tune the problem.
>>> What can cause to such behavior?
>>> I have 5 machine cluster with 32 GB ram.
>>>  Dataset - 3G.
>>>
>>> command for execution:
>>>
>>>
>>>  /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
>>> --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
>>> --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
>>>  /input/tad/inpuut.csv  /output/cad_model_500_2
>>>
>>>
>>> Where can I find description of the parameters?
>>> --num-executors 12
>>> --driver-memory 4g
>>> --executor-memory 2g
>>>
>>> What parameters should be used for tuning?
>>>
>>> Thanks
>>> Oleg.
>>>
>>>
>>>
>>>
>>

Re: pyspark on yarn - lost executor

Posted by Eric Friedman <er...@gmail.com>.
How many partitions do you have in your input rdd?  Are you specifying numPartitions in subsequent calls to groupByKey/reduceByKey?  

> On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets <or...@gmail.com> wrote:
> 
> Hi , 
>   I am execution pyspark on yarn.
> I have successfully executed initial dataset but now I growed it 10 times more.
> 
> during execution I got all the time this error:
>   14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated
> 
>  tasks are failed a resubmitted again:
> 
> 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94
> 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93
> 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64
> 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80
> 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91
> 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86
> 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89
> 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92
> 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73
> 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84
> 
> 
> 
> QUESTION:
>    how to debug / tune the problem.
> What can cause to such behavior? 
> I have 5 machine cluster with 32 GB ram.
>  Dataset - 3G.
> 
> command for execution:
> 
>      /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py  /input/tad/inpuut.csv  /output/cad_model_500_2 
> 
> 
> Where can I find description of the parameters? 
> --num-executors 12  
> --driver-memory 4g 
> --executor-memory 2g
> 
> What parameters should be used for tuning?
> 
> Thanks
> Oleg.
> 
> 
>