You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by prakhar jauhari <pr...@gmail.com> on 2015/10/19 09:51:21 UTC

Spark driver reducing total executors count even when Dynamic Allocation is disabled.

Hey all,

Thanks in advance. I ran into a situation where spark driver reduced the
total executors count for my job even with dynamic allocation disabled, and
caused the job to hang for ever. 

Setup: 
Spark-1.3.1 on hadoop-yarn-2.4.0 cluster. 
All servers in cluster running Linux version 2.6.32. 
Job in yarn-client mode.

Scenario:
1. Application running with required number of executors.
2. One of the DN's losses connectivity and is timed out.
2. Spark issues a killExecutor for the executor on the DN which was timed
out. 
3. Even with dynamic allocation off, spark's driver reduces the
"targetNumExecutors".

On analysing the code (Spark 1.3.1): 

When my DN goes unreachable: 
	
Spark core's HeartbeatReceiver invokes expireDeadHosts(): which checks if
Dynamic Allocation is supported and then invokes "sc.killExecutor()"

	/if (sc.supportDynamicAllocation) {
          	sc.killExecutor(executorId)
        }/ 

Surprisingly supportDynamicAllocation in sparkContext.scala is defined as,
resulting "True" if dynamicAllocationTesting flag is enabled or spark is
running over "yarn".

/private[spark] def supportDynamicAllocation = 
    		    master.contains("yarn") || dynamicAllocationTesting	/

"sc.killExecutor()" matches it to configured "schedulerBackend"
(CoarseGrainedSchedulerBackend in this case) and invokes
"killExecutors(executorIds)"

CoarseGrainedSchedulerBackend calculates a "newTotal" for the total number
of executors required, and sends a update to application master by invoking
"doRequestTotalExecutors(newTotal)"
 
CoarseGrainedSchedulerBackend then invokes a
"doKillExecutors(filteredExecutorIds)" for the lost executors. 

Thus reducing the total number of executors in a host intermittently
unreachable scenario.


I noticed that this change to "CoarseGrainedSchedulerBackend" was introduced
while fixing :  https://issues.apache.org/jira/browse/SPARK-6325
<https://issues.apache.org/jira/browse/SPARK-6325>  



I am new to this code, If any of you could comment on why do we need
"doRequestTotalExecutors" in "killExecutors" would be a great help. Also why
do we have "supportDynamicAllocation" = True even if i have not enabled
dynamic allocation. 

Regards,
Prakhar.




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-driver-reducing-total-executors-count-even-when-Dynamic-Allocation-is-disabled-tp14679.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Spark driver reducing total executors count even when Dynamic Allocation is disabled.

Posted by Saisai Shao <sa...@gmail.com>.
Hi Prakhar,

I start to know your problem, you expected that the killed exexcutor by
heartbeat mechanism should be launched again but seems not. This problem I
think is fixed in the version 1.5 of Spark, you could check this jira
https://issues.apache.org/jira/browse/SPARK-8119

Thanks
Saisai

2015年10月20日星期二,prakhar jauhari <pr...@gmail.com> 写道:

> Thanks sai for the input,
>
> So the problem is : i start my job with some fixed number of executors,
> but when a host running my executors goes unreachable, driver reduces the
> total number of executors. And never increases it.
>
> I have a repro for the issue, attaching logs:
> !!!! Running spark job is configured for 2 executors, dynamic allocation
> not enabled !!!!!!!
>
> AM starts requesting the 2 executors:
> 15/10/19 12:25:58 INFO yarn.YarnRMClient: Registering the ApplicationMaster
> 15/10/19 12:25:59 INFO yarn.YarnAllocator: Will request 2 executor
> containers, each with 1 cores and 1408 MB memory including 384 MB overhead
> 15/10/19 12:25:59 INFO yarn.YarnAllocator: Container request (host: Any,
> capability: <memory:1408, vCores:1>)
> 15/10/19 12:25:59 INFO yarn.YarnAllocator: Container request (host: Any,
> capability: <memory:1408, vCores:1>)
> 15/10/19 12:25:59 INFO yarn.ApplicationMaster: Started progress reporter
> thread - sleep time : 5000
>
> Executors launched:
> 15/10/19 12:26:04 INFO impl.AMRMClientImpl: Received new token for :
> DN-2:58739
> 15/10/19 12:26:04 INFO impl.AMRMClientImpl: Received new token for :
> DN-1:44591
> 15/10/19 12:26:04 INFO yarn.YarnAllocator: Launching container
> container_1444841612643_0014_01_000002 for on host DN-2
> 15/10/19 12:26:04 INFO yarn.YarnAllocator: Launching ExecutorRunnable.
> driverUrl: akka.tcp://sparkDriver@NN-1:35115/user/CoarseGrainedScheduler,
> executorHostname: DN-2
> 15/10/19 12:26:04 INFO yarn.YarnAllocator: Launching container
> container_1444841612643_0014_01_000003 for on host DN-1
> 15/10/19 12:26:04 INFO yarn.YarnAllocator: Launching ExecutorRunnable.
> driverUrl: akka.tcp://sparkDriver@NN-1:35115/user/CoarseGrainedScheduler,
> executorHostname: DN-1
>
> Now my AM and executor 1 are running on DN-2, DN-1 has executor 2 running
> on it. To reproduce this issue I removed IP from DN-1, until it was timed
> out by spark.
> 15/10/19 13:03:30 INFO yarn.YarnAllocator: Driver requested a total number
> of 1 executor(s).
> 15/10/19 13:03:30 INFO yarn.ApplicationMaster: Driver requested to kill
> executor(s) 2.
>
>
> So the driver has reduced the total number of executor to : 1
> And now even when the DN comes up and rejoins the cluster, this count is
> not increased.
> If I had executor 1 running on a separate DN (not the same as AM's DN),
> and that DN went unreachable, driver would reduce total number of executor
> to : 0 and the job hangs forever. And this is when i have not enabled
> Dynamic allocation. My cluster has other DN's available, AM should request
> the killed executors from yarn, and get it on some other DN's.
>
> Regards,
> Prakhar
>
>
> On Mon, Oct 19, 2015 at 2:47 PM, Saisai Shao <sai.sai.shao@gmail.com
> <javascript:_e(%7B%7D,'cvml','sai.sai.shao@gmail.com');>> wrote:
>
>> This is a deliberate killing request by heartbeat mechanism, have nothing
>> to do with dynamic allocation. Here because you're running on yarn mode, so
>> "supportDynamicAllocation" will be true, but actually there's no
>> relation to dynamic allocation.
>>
>> From my understanding "doRequestTotalExecutors" is to sync the current
>> total executor number with AM, AM will try to cancel some pending container
>> requests when current expected executor number is less. The actual
>> container killing command is issued by "doRequestTotalExecutors".
>>
>> Not sure what is your actual problem? is it unexpected?
>>
>> Thanks
>> Saisai
>>
>>
>> On Mon, Oct 19, 2015 at 3:51 PM, prakhar jauhari <prak840@gmail.com
>> <javascript:_e(%7B%7D,'cvml','prak840@gmail.com');>> wrote:
>>
>>> Hey all,
>>>
>>> Thanks in advance. I ran into a situation where spark driver reduced the
>>> total executors count for my job even with dynamic allocation disabled,
>>> and
>>> caused the job to hang for ever.
>>>
>>> Setup:
>>> Spark-1.3.1 on hadoop-yarn-2.4.0 cluster.
>>> All servers in cluster running Linux version 2.6.32.
>>> Job in yarn-client mode.
>>>
>>> Scenario:
>>> 1. Application running with required number of executors.
>>> 2. One of the DN's losses connectivity and is timed out.
>>> 2. Spark issues a killExecutor for the executor on the DN which was timed
>>> out.
>>> 3. Even with dynamic allocation off, spark's driver reduces the
>>> "targetNumExecutors".
>>>
>>> On analysing the code (Spark 1.3.1):
>>>
>>> When my DN goes unreachable:
>>>
>>> Spark core's HeartbeatReceiver invokes expireDeadHosts(): which checks if
>>> Dynamic Allocation is supported and then invokes "sc.killExecutor()"
>>>
>>>         /if (sc.supportDynamicAllocation) {
>>>                 sc.killExecutor(executorId)
>>>         }/
>>>
>>> Surprisingly supportDynamicAllocation in sparkContext.scala is defined
>>> as,
>>> resulting "True" if dynamicAllocationTesting flag is enabled or spark is
>>> running over "yarn".
>>>
>>> /private[spark] def supportDynamicAllocation =
>>>                     master.contains("yarn") || dynamicAllocationTesting /
>>>
>>> "sc.killExecutor()" matches it to configured "schedulerBackend"
>>> (CoarseGrainedSchedulerBackend in this case) and invokes
>>> "killExecutors(executorIds)"
>>>
>>> CoarseGrainedSchedulerBackend calculates a "newTotal" for the total
>>> number
>>> of executors required, and sends a update to application master by
>>> invoking
>>> "doRequestTotalExecutors(newTotal)"
>>>
>>> CoarseGrainedSchedulerBackend then invokes a
>>> "doKillExecutors(filteredExecutorIds)" for the lost executors.
>>>
>>> Thus reducing the total number of executors in a host intermittently
>>> unreachable scenario.
>>>
>>>
>>> I noticed that this change to "CoarseGrainedSchedulerBackend" was
>>> introduced
>>> while fixing :  https://issues.apache.org/jira/browse/SPARK-6325
>>> <https://issues.apache.org/jira/browse/SPARK-6325>
>>>
>>>
>>>
>>> I am new to this code, If any of you could comment on why do we need
>>> "doRequestTotalExecutors" in "killExecutors" would be a great help. Also
>>> why
>>> do we have "supportDynamicAllocation" = True even if i have not enabled
>>> dynamic allocation.
>>>
>>> Regards,
>>> Prakhar.
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-driver-reducing-total-executors-count-even-when-Dynamic-Allocation-is-disabled-tp14679.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> <javascript:_e(%7B%7D,'cvml','dev-unsubscribe@spark.apache.org');>
>>> For additional commands, e-mail: dev-help@spark.apache.org
>>> <javascript:_e(%7B%7D,'cvml','dev-help@spark.apache.org');>
>>>
>>>
>>
>

Re: Spark driver reducing total executors count even when Dynamic Allocation is disabled.

Posted by prakhar jauhari <pr...@gmail.com>.
Thanks sai for the input,

So the problem is : i start my job with some fixed number of executors, but
when a host running my executors goes unreachable, driver reduces the total
number of executors. And never increases it.

I have a repro for the issue, attaching logs:
!!!! Running spark job is configured for 2 executors, dynamic allocation
not enabled !!!!!!!

AM starts requesting the 2 executors:
15/10/19 12:25:58 INFO yarn.YarnRMClient: Registering the ApplicationMaster
15/10/19 12:25:59 INFO yarn.YarnAllocator: Will request 2 executor
containers, each with 1 cores and 1408 MB memory including 384 MB overhead
15/10/19 12:25:59 INFO yarn.YarnAllocator: Container request (host: Any,
capability: <memory:1408, vCores:1>)
15/10/19 12:25:59 INFO yarn.YarnAllocator: Container request (host: Any,
capability: <memory:1408, vCores:1>)
15/10/19 12:25:59 INFO yarn.ApplicationMaster: Started progress reporter
thread - sleep time : 5000

Executors launched:
15/10/19 12:26:04 INFO impl.AMRMClientImpl: Received new token for :
DN-2:58739
15/10/19 12:26:04 INFO impl.AMRMClientImpl: Received new token for :
DN-1:44591
15/10/19 12:26:04 INFO yarn.YarnAllocator: Launching container
container_1444841612643_0014_01_000002 for on host DN-2
15/10/19 12:26:04 INFO yarn.YarnAllocator: Launching ExecutorRunnable.
driverUrl: akka.tcp://sparkDriver@NN-1:35115/user/CoarseGrainedScheduler,
executorHostname: DN-2
15/10/19 12:26:04 INFO yarn.YarnAllocator: Launching container
container_1444841612643_0014_01_000003 for on host DN-1
15/10/19 12:26:04 INFO yarn.YarnAllocator: Launching ExecutorRunnable.
driverUrl: akka.tcp://sparkDriver@NN-1:35115/user/CoarseGrainedScheduler,
executorHostname: DN-1

Now my AM and executor 1 are running on DN-2, DN-1 has executor 2 running
on it. To reproduce this issue I removed IP from DN-1, until it was timed
out by spark.
15/10/19 13:03:30 INFO yarn.YarnAllocator: Driver requested a total number
of 1 executor(s).
15/10/19 13:03:30 INFO yarn.ApplicationMaster: Driver requested to kill
executor(s) 2.


So the driver has reduced the total number of executor to : 1
And now even when the DN comes up and rejoins the cluster, this count is
not increased.
If I had executor 1 running on a separate DN (not the same as AM's DN), and
that DN went unreachable, driver would reduce total number of executor to :
0 and the job hangs forever. And this is when i have not enabled Dynamic
allocation. My cluster has other DN's available, AM should request the
killed executors from yarn, and get it on some other DN's.

Regards,
Prakhar


On Mon, Oct 19, 2015 at 2:47 PM, Saisai Shao <sa...@gmail.com> wrote:

> This is a deliberate killing request by heartbeat mechanism, have nothing
> to do with dynamic allocation. Here because you're running on yarn mode, so
> "supportDynamicAllocation" will be true, but actually there's no relation
> to dynamic allocation.
>
> From my understanding "doRequestTotalExecutors" is to sync the current
> total executor number with AM, AM will try to cancel some pending container
> requests when current expected executor number is less. The actual
> container killing command is issued by "doRequestTotalExecutors".
>
> Not sure what is your actual problem? is it unexpected?
>
> Thanks
> Saisai
>
>
> On Mon, Oct 19, 2015 at 3:51 PM, prakhar jauhari <pr...@gmail.com>
> wrote:
>
>> Hey all,
>>
>> Thanks in advance. I ran into a situation where spark driver reduced the
>> total executors count for my job even with dynamic allocation disabled,
>> and
>> caused the job to hang for ever.
>>
>> Setup:
>> Spark-1.3.1 on hadoop-yarn-2.4.0 cluster.
>> All servers in cluster running Linux version 2.6.32.
>> Job in yarn-client mode.
>>
>> Scenario:
>> 1. Application running with required number of executors.
>> 2. One of the DN's losses connectivity and is timed out.
>> 2. Spark issues a killExecutor for the executor on the DN which was timed
>> out.
>> 3. Even with dynamic allocation off, spark's driver reduces the
>> "targetNumExecutors".
>>
>> On analysing the code (Spark 1.3.1):
>>
>> When my DN goes unreachable:
>>
>> Spark core's HeartbeatReceiver invokes expireDeadHosts(): which checks if
>> Dynamic Allocation is supported and then invokes "sc.killExecutor()"
>>
>>         /if (sc.supportDynamicAllocation) {
>>                 sc.killExecutor(executorId)
>>         }/
>>
>> Surprisingly supportDynamicAllocation in sparkContext.scala is defined as,
>> resulting "True" if dynamicAllocationTesting flag is enabled or spark is
>> running over "yarn".
>>
>> /private[spark] def supportDynamicAllocation =
>>                     master.contains("yarn") || dynamicAllocationTesting /
>>
>> "sc.killExecutor()" matches it to configured "schedulerBackend"
>> (CoarseGrainedSchedulerBackend in this case) and invokes
>> "killExecutors(executorIds)"
>>
>> CoarseGrainedSchedulerBackend calculates a "newTotal" for the total number
>> of executors required, and sends a update to application master by
>> invoking
>> "doRequestTotalExecutors(newTotal)"
>>
>> CoarseGrainedSchedulerBackend then invokes a
>> "doKillExecutors(filteredExecutorIds)" for the lost executors.
>>
>> Thus reducing the total number of executors in a host intermittently
>> unreachable scenario.
>>
>>
>> I noticed that this change to "CoarseGrainedSchedulerBackend" was
>> introduced
>> while fixing :  https://issues.apache.org/jira/browse/SPARK-6325
>> <https://issues.apache.org/jira/browse/SPARK-6325>
>>
>>
>>
>> I am new to this code, If any of you could comment on why do we need
>> "doRequestTotalExecutors" in "killExecutors" would be a great help. Also
>> why
>> do we have "supportDynamicAllocation" = True even if i have not enabled
>> dynamic allocation.
>>
>> Regards,
>> Prakhar.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-driver-reducing-total-executors-count-even-when-Dynamic-Allocation-is-disabled-tp14679.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> For additional commands, e-mail: dev-help@spark.apache.org
>>
>>
>

Re: Spark driver reducing total executors count even when Dynamic Allocation is disabled.

Posted by Saisai Shao <sa...@gmail.com>.
This is a deliberate killing request by heartbeat mechanism, have nothing
to do with dynamic allocation. Here because you're running on yarn mode, so
"supportDynamicAllocation" will be true, but actually there's no relation
to dynamic allocation.

>From my understanding "doRequestTotalExecutors" is to sync the current
total executor number with AM, AM will try to cancel some pending container
requests when current expected executor number is less. The actual
container killing command is issued by "doRequestTotalExecutors".

Not sure what is your actual problem? is it unexpected?

Thanks
Saisai


On Mon, Oct 19, 2015 at 3:51 PM, prakhar jauhari <pr...@gmail.com> wrote:

> Hey all,
>
> Thanks in advance. I ran into a situation where spark driver reduced the
> total executors count for my job even with dynamic allocation disabled, and
> caused the job to hang for ever.
>
> Setup:
> Spark-1.3.1 on hadoop-yarn-2.4.0 cluster.
> All servers in cluster running Linux version 2.6.32.
> Job in yarn-client mode.
>
> Scenario:
> 1. Application running with required number of executors.
> 2. One of the DN's losses connectivity and is timed out.
> 2. Spark issues a killExecutor for the executor on the DN which was timed
> out.
> 3. Even with dynamic allocation off, spark's driver reduces the
> "targetNumExecutors".
>
> On analysing the code (Spark 1.3.1):
>
> When my DN goes unreachable:
>
> Spark core's HeartbeatReceiver invokes expireDeadHosts(): which checks if
> Dynamic Allocation is supported and then invokes "sc.killExecutor()"
>
>         /if (sc.supportDynamicAllocation) {
>                 sc.killExecutor(executorId)
>         }/
>
> Surprisingly supportDynamicAllocation in sparkContext.scala is defined as,
> resulting "True" if dynamicAllocationTesting flag is enabled or spark is
> running over "yarn".
>
> /private[spark] def supportDynamicAllocation =
>                     master.contains("yarn") || dynamicAllocationTesting /
>
> "sc.killExecutor()" matches it to configured "schedulerBackend"
> (CoarseGrainedSchedulerBackend in this case) and invokes
> "killExecutors(executorIds)"
>
> CoarseGrainedSchedulerBackend calculates a "newTotal" for the total number
> of executors required, and sends a update to application master by invoking
> "doRequestTotalExecutors(newTotal)"
>
> CoarseGrainedSchedulerBackend then invokes a
> "doKillExecutors(filteredExecutorIds)" for the lost executors.
>
> Thus reducing the total number of executors in a host intermittently
> unreachable scenario.
>
>
> I noticed that this change to "CoarseGrainedSchedulerBackend" was
> introduced
> while fixing :  https://issues.apache.org/jira/browse/SPARK-6325
> <https://issues.apache.org/jira/browse/SPARK-6325>
>
>
>
> I am new to this code, If any of you could comment on why do we need
> "doRequestTotalExecutors" in "killExecutors" would be a great help. Also
> why
> do we have "supportDynamicAllocation" = True even if i have not enabled
> dynamic allocation.
>
> Regards,
> Prakhar.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-driver-reducing-total-executors-count-even-when-Dynamic-Allocation-is-disabled-tp14679.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>