You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jeff Zhang <zj...@gmail.com> on 2016/02/26 11:44:30 UTC

Is spark.driver.maxResultSize used correctly ?

My job get this exception very easily even when I set large value of
spark.driver.maxResultSize. After checking the spark code, I found
spark.driver.maxResultSize is also used in Executor side to decide whether
DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
Using  spark.driver.maxResultSize / taskNum might be more proper. Because
if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
output. Then even the output of each task is less than
 spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
the total result size is 2g which will cause exception in driver side.


16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
LogisticRegression.scala:283, took 33.796379 s

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Total size of serialized results of 1 tasks (1085.0 MB)
is bigger than spark.driver.maxResultSize (1024.0 MB)


-- 
Best Regards

Jeff Zhang

Re: Is spark.driver.maxResultSize used correctly ?

Posted by Jeff Zhang <zj...@gmail.com>.
Check the code again. Looks like currently the task result will be loaded
into memory no matter it is DirectTaskResult or InDirectTaskResult.
Previous I thought InDirectTaskResult can be loaded into memory later which
can save memory, RDD#collectAsIterator is what I thought that may save
memory.

On Tue, Mar 1, 2016 at 5:00 PM, Reynold Xin <rx...@databricks.com> wrote:

> How big of a deal is this though? If I am reading your email correctly,
> either way this job will fail. You simply want it to fail earlier in the
> executor side, rather than collecting it and fail on the driver side?
>
>
> On Sunday, February 28, 2016, Jeff Zhang <zj...@gmail.com> wrote:
>
>> data skew might be possible, but not the common case. I think we should
>> design for the common case, for the skew case, we may can set some
>> parameter of fraction to allow user to tune it.
>>
>> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> But sometimes you might have skew and almost all the result data are in
>>> one or a few tasks though.
>>>
>>>
>>> On Friday, February 26, 2016, Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>>
>>>> My job get this exception very easily even when I set large value of
>>>> spark.driver.maxResultSize. After checking the spark code, I found
>>>> spark.driver.maxResultSize is also used in Executor side to decide whether
>>>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>>>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>>>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>>>> output. Then even the output of each task is less than
>>>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>>>> the total result size is 2g which will cause exception in driver side.
>>>>
>>>>
>>>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>>>> LogisticRegression.scala:283, took 33.796379 s
>>>>
>>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>>>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang

Re: Is spark.driver.maxResultSize used correctly ?

Posted by Jeff Zhang <zj...@gmail.com>.
Check the code again. Looks like currently the task result will be loaded
into memory no matter it is DirectTaskResult or InDirectTaskResult.
Previous I thought InDirectTaskResult can be loaded into memory later which
can save memory, RDD#collectAsIterator is what I thought that may save
memory.

On Tue, Mar 1, 2016 at 5:00 PM, Reynold Xin <rx...@databricks.com> wrote:

> How big of a deal is this though? If I am reading your email correctly,
> either way this job will fail. You simply want it to fail earlier in the
> executor side, rather than collecting it and fail on the driver side?
>
>
> On Sunday, February 28, 2016, Jeff Zhang <zj...@gmail.com> wrote:
>
>> data skew might be possible, but not the common case. I think we should
>> design for the common case, for the skew case, we may can set some
>> parameter of fraction to allow user to tune it.
>>
>> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> But sometimes you might have skew and almost all the result data are in
>>> one or a few tasks though.
>>>
>>>
>>> On Friday, February 26, 2016, Jeff Zhang <zj...@gmail.com> wrote:
>>>
>>>>
>>>> My job get this exception very easily even when I set large value of
>>>> spark.driver.maxResultSize. After checking the spark code, I found
>>>> spark.driver.maxResultSize is also used in Executor side to decide whether
>>>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>>>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>>>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>>>> output. Then even the output of each task is less than
>>>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>>>> the total result size is 2g which will cause exception in driver side.
>>>>
>>>>
>>>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>>>> LogisticRegression.scala:283, took 33.796379 s
>>>>
>>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>>>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang

Re: Is spark.driver.maxResultSize used correctly ?

Posted by Reynold Xin <rx...@databricks.com>.
How big of a deal is this though? If I am reading your email correctly,
either way this job will fail. You simply want it to fail earlier in the
executor side, rather than collecting it and fail on the driver side?

On Sunday, February 28, 2016, Jeff Zhang <zj...@gmail.com> wrote:

> data skew might be possible, but not the common case. I think we should
> design for the common case, for the skew case, we may can set some
> parameter of fraction to allow user to tune it.
>
> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin <rxin@databricks.com
> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>
>> But sometimes you might have skew and almost all the result data are in
>> one or a few tasks though.
>>
>>
>> On Friday, February 26, 2016, Jeff Zhang <zjffdu@gmail.com
>> <javascript:_e(%7B%7D,'cvml','zjffdu@gmail.com');>> wrote:
>>
>>>
>>> My job get this exception very easily even when I set large value of
>>> spark.driver.maxResultSize. After checking the spark code, I found
>>> spark.driver.maxResultSize is also used in Executor side to decide whether
>>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>>> output. Then even the output of each task is less than
>>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>>> the total result size is 2g which will cause exception in driver side.
>>>
>>>
>>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>>> LogisticRegression.scala:283, took 33.796379 s
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Is spark.driver.maxResultSize used correctly ?

Posted by Reynold Xin <rx...@databricks.com>.
How big of a deal is this though? If I am reading your email correctly,
either way this job will fail. You simply want it to fail earlier in the
executor side, rather than collecting it and fail on the driver side?

On Sunday, February 28, 2016, Jeff Zhang <zj...@gmail.com> wrote:

> data skew might be possible, but not the common case. I think we should
> design for the common case, for the skew case, we may can set some
> parameter of fraction to allow user to tune it.
>
> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin <rxin@databricks.com
> <javascript:_e(%7B%7D,'cvml','rxin@databricks.com');>> wrote:
>
>> But sometimes you might have skew and almost all the result data are in
>> one or a few tasks though.
>>
>>
>> On Friday, February 26, 2016, Jeff Zhang <zjffdu@gmail.com
>> <javascript:_e(%7B%7D,'cvml','zjffdu@gmail.com');>> wrote:
>>
>>>
>>> My job get this exception very easily even when I set large value of
>>> spark.driver.maxResultSize. After checking the spark code, I found
>>> spark.driver.maxResultSize is also used in Executor side to decide whether
>>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>>> output. Then even the output of each task is less than
>>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>>> the total result size is 2g which will cause exception in driver side.
>>>
>>>
>>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>>> LogisticRegression.scala:283, took 33.796379 s
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Is spark.driver.maxResultSize used correctly ?

Posted by Jeff Zhang <zj...@gmail.com>.
data skew might be possible, but not the common case. I think we should
design for the common case, for the skew case, we may can set some
parameter of fraction to allow user to tune it.

On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin <rx...@databricks.com> wrote:

> But sometimes you might have skew and almost all the result data are in
> one or a few tasks though.
>
>
> On Friday, February 26, 2016, Jeff Zhang <zj...@gmail.com> wrote:
>
>>
>> My job get this exception very easily even when I set large value of
>> spark.driver.maxResultSize. After checking the spark code, I found
>> spark.driver.maxResultSize is also used in Executor side to decide whether
>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>> output. Then even the output of each task is less than
>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>> the total result size is 2g which will cause exception in driver side.
>>
>>
>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>> LogisticRegression.scala:283, took 33.796379 s
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang

Re: Is spark.driver.maxResultSize used correctly ?

Posted by Jeff Zhang <zj...@gmail.com>.
data skew might be possible, but not the common case. I think we should
design for the common case, for the skew case, we may can set some
parameter of fraction to allow user to tune it.

On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin <rx...@databricks.com> wrote:

> But sometimes you might have skew and almost all the result data are in
> one or a few tasks though.
>
>
> On Friday, February 26, 2016, Jeff Zhang <zj...@gmail.com> wrote:
>
>>
>> My job get this exception very easily even when I set large value of
>> spark.driver.maxResultSize. After checking the spark code, I found
>> spark.driver.maxResultSize is also used in Executor side to decide whether
>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>> output. Then even the output of each task is less than
>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>> the total result size is 2g which will cause exception in driver side.
>>
>>
>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>> LogisticRegression.scala:283, took 33.796379 s
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang

Re: Is spark.driver.maxResultSize used correctly ?

Posted by Reynold Xin <rx...@databricks.com>.
But sometimes you might have skew and almost all the result data are in one
or a few tasks though.

On Friday, February 26, 2016, Jeff Zhang <zj...@gmail.com> wrote:

>
> My job get this exception very easily even when I set large value of
> spark.driver.maxResultSize. After checking the spark code, I found
> spark.driver.maxResultSize is also used in Executor side to decide whether
> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
> output. Then even the output of each task is less than
>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
> the total result size is 2g which will cause exception in driver side.
>
>
> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
> LogisticRegression.scala:283, took 33.796379 s
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Total size of serialized results of 1 tasks (1085.0
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Is spark.driver.maxResultSize used correctly ?

Posted by Reynold Xin <rx...@databricks.com>.
But sometimes you might have skew and almost all the result data are in one
or a few tasks though.

On Friday, February 26, 2016, Jeff Zhang <zj...@gmail.com> wrote:

>
> My job get this exception very easily even when I set large value of
> spark.driver.maxResultSize. After checking the spark code, I found
> spark.driver.maxResultSize is also used in Executor side to decide whether
> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
> output. Then even the output of each task is less than
>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
> the total result size is 2g which will cause exception in driver side.
>
>
> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
> LogisticRegression.scala:283, took 33.796379 s
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Total size of serialized results of 1 tasks (1085.0
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>
>
> --
> Best Regards
>
> Jeff Zhang
>