You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2022/02/05 09:03:59 UTC

spark, autoscaling and handling node loss with autoscaling

This question arises when Spark is offered as a managed service on a
cluster of VMs in Cloud. For example, Google Dataproc
<https://cloud.google.com/dataproc> or Amazon EMR
<https://aws.amazon.com/emr/> among others

From what I can see in autoscaling setup, you will always need a minimum of
two worker nodes as primary. It also states and I quote "Scaling primary
workers is not recommended due to HDFS limitations which result in
instability while scaling. These limitations do not exist for secondary
workers''. So the scaling comes with the secondary workers specifying the
minimum and maximum instances. It also defaults to 2 minutes for the
so-called auto scaling cooldown duration presumably to bring new executors
online. My assumption is that task allocation to the new executors is FIFO
for new tasks. This link
<https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.>
does
some explanation on autoscaling.

Handling Spot Node Loss in Spark Clusters
When the Spark YARN Application Master (AM) receives the spot loss
notification from the YARN Resource Manager (RM), it notifies the Spark
driver. The driver then performs the following actions:

   1. Identifies all the executors affected by the upcoming node loss.
   2. Moves all of the affected executors to a decommissioning state, and
   no new tasks are scheduled on these executors.
   3. Kills all the executors after reaching 50% of the termination time.
   4. Starts the failed tasks on the remaining executors.
   5. For these nodes, it removes all the entries of the shuffle data from the
   map output tracker on driver after reaching 90% of the termination time.
   This helps in preventing the shuffle-fetch failures due to spot loss.
   6. Recomputes the shuffle data from the lost node by stage resubmission
   and at the time shuffles data of the spot node if required.

My conclusion is that when a node fails classic spark comes into play and
no new nodes are added even with autoscaling enabled and the failed tasks
are redistributed among the existing executors.


Basically autoscaling does not deal with failed nodes?


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: spark, autoscaling and handling node loss with autoscaling

Posted by Mich Talebzadeh <mi...@gmail.com>.
I did some tests on a three node Dataproc cluster with autoscaling on. One
master node and 2 work nodes. the master node was called ctpcluster-m and
the worker nodes were ctpcluster-w-0 and ctpcluster-w-1 respectively

I started a spark-submit job with the following autoscaling parameters added

        spark-submit --verbose \
           --deploy-mode client \
           --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
           --conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
           --conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
           --py-files $CODE_DIRECTORY_CLOUD/spark_on_gke.zip \
           --conf "spark.driver.memory"=4G \
           --conf "spark.executor.memory"=4G \
           --conf "spark.num.executors"=4 \
           --conf "spark.executor.cores"=2 \
           --conf spark.dynamicAllocation.enabled="true" \
           --conf spark.shuffle.service.enabled="true" \
           --conf spark.dynamicAllocation.minExecutors=2 \
           --conf spark.dynamicAllocation.maxExecutors=10 \
           --conf spark.dynamicAllocation.initialExecutors=4 \  same
as spark.num.executors
4
           $CODE_DIRECTORY_CLOUD/${APPLICATION}

Once I started the submit job, I shutdown ctpcluster-w-1 node immediately.
These were the diagnostics thrown out:

22/02/05 18:37:12 INFO
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted
application application_1644085520369_0003
22/02/05 18:37:13 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to
ResourceManager at ctpcluster-m/10.154.15.193:8030

Started at
05/02/2022 18:37:18.18
22/02/05 18:37:26 WARN
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
*Requesting driver to remove executor 3 for reason Container marked as
failed*: *container_1644085520369_0003_01_000003* on host:
ctpcluster-w-1.europe-west2-c.c.xxx.internal. Exit status: -100.
Diagnostics: Container released on a *lost* node.
22/02/05 18:37:26 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: *Lost
executor 3 on ctpcluster-w-1.europe-west2-c.c.xxx.internal: Container
marked as failed:* container_1644085520369_0003_01_000003 on host:
ctpcluster-w-1.europe-west2-c.c.xxx.internal. Exit status: -100.
Diagnostics: Container released on a *lost* node.
22/02/05 18:37:26 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: *Lost
executor 1 on ctpcluster-w-1.europe-west2-c.c.xxx.internal: Container
marked as failed:* *container_1644085520369_0003_01_000001 *on host:
ctpcluster-w-1.europe-west2-c.c.xxx.internal. Exit status: -100.
Diagnostics: Container released on a *lost* node.
22/02/05 18:37:26 WARN
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
Requesting driver to remove executor 1 for reason Container marked as
failed: container_1644085520369_0003_01_000001 on host:
ctpcluster-w-1.europe-west2-c.c.xxx.internal. Exit status: -100.
Diagnostics: Container released on a *lost* node.

So basically two containers out of the original four containers were lost
as they were on the lost node. There was no attempt to autoscale the lost
worker node. The job was executed on the remaining two containers on
ctpcluster-w-0.

My conclusion is that autoscaling is only applied to workload at a clean
state.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 5 Feb 2022 at 09:03, Mich Talebzadeh <mi...@gmail.com>
wrote:

>
> This question arises when Spark is offered as a managed service on a
> cluster of VMs in Cloud. For example, Google Dataproc
> <https://cloud.google.com/dataproc> or Amazon EMR
> <https://aws.amazon.com/emr/> among others
>
> From what I can see in autoscaling setup, you will always need a minimum
> of two worker nodes as primary. It also states and I quote "Scaling
> primary workers is not recommended due to HDFS limitations which result in
> instability while scaling. These limitations do not exist for secondary
> workers''. So the scaling comes with the secondary workers specifying the
> minimum and maximum instances. It also defaults to 2 minutes for the
> so-called auto scaling cooldown duration presumably to bring new executors
> online. My assumption is that task allocation to the new executors is FIFO
> for new tasks. This link
> <https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.> does
> some explanation on autoscaling.
>
> Handling Spot Node Loss in Spark Clusters
> When the Spark YARN Application Master (AM) receives the spot loss
> notification from the YARN Resource Manager (RM), it notifies the Spark
> driver. The driver then performs the following actions:
>
>    1. Identifies all the executors affected by the upcoming node loss.
>    2. Moves all of the affected executors to a decommissioning state, and
>    no new tasks are scheduled on these executors.
>    3. Kills all the executors after reaching 50% of the termination time.
>    4. Starts the failed tasks on the remaining executors.
>    5. For these nodes, it removes all the entries of the shuffle data
>    from the map output tracker on driver after reaching 90% of the
>    termination time. This helps in preventing the shuffle-fetch failures due
>    to spot loss.
>    6. Recomputes the shuffle data from the lost node by stage
>    resubmission and at the time shuffles data of the spot node if required.
>
> My conclusion is that when a node fails classic spark comes into play and
> no new nodes are added even with autoscaling enabled and the failed tasks
> are redistributed among the existing executors.
>
>
> Basically autoscaling does not deal with failed nodes?
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Re: spark, autoscaling and handling node loss with autoscaling

Posted by Mich Talebzadeh <mi...@gmail.com>.
I did some tests on a three node Dataproc cluster with autoscaling on. One
master node and 2 work nodes. the master node was called ctpcluster-m and
the worker nodes were ctpcluster-w-0 and ctpcluster-w-1 respectively

I started a spark-submit job with the following autoscaling parameters added

        spark-submit --verbose \
           --deploy-mode client \
           --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
           --conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
           --conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
           --py-files $CODE_DIRECTORY_CLOUD/spark_on_gke.zip \
           --conf "spark.driver.memory"=4G \
           --conf "spark.executor.memory"=4G \
           --conf "spark.num.executors"=4 \
           --conf "spark.executor.cores"=2 \
           --conf spark.dynamicAllocation.enabled="true" \
           --conf spark.shuffle.service.enabled="true" \
           --conf spark.dynamicAllocation.minExecutors=2 \
           --conf spark.dynamicAllocation.maxExecutors=10 \
           --conf spark.dynamicAllocation.initialExecutors=4 \  same
as spark.num.executors
4
           $CODE_DIRECTORY_CLOUD/${APPLICATION}

Once I started the submit job, I shutdown ctpcluster-w-1 node immediately.
These were the diagnostics thrown out:

22/02/05 18:37:12 INFO
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted
application application_1644085520369_0003
22/02/05 18:37:13 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to
ResourceManager at ctpcluster-m/10.154.15.193:8030

Started at
05/02/2022 18:37:18.18
22/02/05 18:37:26 WARN
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
*Requesting driver to remove executor 3 for reason Container marked as
failed*: *container_1644085520369_0003_01_000003* on host:
ctpcluster-w-1.europe-west2-c.c.xxx.internal. Exit status: -100.
Diagnostics: Container released on a *lost* node.
22/02/05 18:37:26 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: *Lost
executor 3 on ctpcluster-w-1.europe-west2-c.c.xxx.internal: Container
marked as failed:* container_1644085520369_0003_01_000003 on host:
ctpcluster-w-1.europe-west2-c.c.xxx.internal. Exit status: -100.
Diagnostics: Container released on a *lost* node.
22/02/05 18:37:26 ERROR org.apache.spark.scheduler.cluster.YarnScheduler: *Lost
executor 1 on ctpcluster-w-1.europe-west2-c.c.xxx.internal: Container
marked as failed:* *container_1644085520369_0003_01_000001 *on host:
ctpcluster-w-1.europe-west2-c.c.xxx.internal. Exit status: -100.
Diagnostics: Container released on a *lost* node.
22/02/05 18:37:26 WARN
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
Requesting driver to remove executor 1 for reason Container marked as
failed: container_1644085520369_0003_01_000001 on host:
ctpcluster-w-1.europe-west2-c.c.xxx.internal. Exit status: -100.
Diagnostics: Container released on a *lost* node.

So basically two containers out of the original four containers were lost
as they were on the lost node. There was no attempt to autoscale the lost
worker node. The job was executed on the remaining two containers on
ctpcluster-w-0.

My conclusion is that autoscaling is only applied to workload at a clean
state.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 5 Feb 2022 at 09:03, Mich Talebzadeh <mi...@gmail.com>
wrote:

>
> This question arises when Spark is offered as a managed service on a
> cluster of VMs in Cloud. For example, Google Dataproc
> <https://cloud.google.com/dataproc> or Amazon EMR
> <https://aws.amazon.com/emr/> among others
>
> From what I can see in autoscaling setup, you will always need a minimum
> of two worker nodes as primary. It also states and I quote "Scaling
> primary workers is not recommended due to HDFS limitations which result in
> instability while scaling. These limitations do not exist for secondary
> workers''. So the scaling comes with the secondary workers specifying the
> minimum and maximum instances. It also defaults to 2 minutes for the
> so-called auto scaling cooldown duration presumably to bring new executors
> online. My assumption is that task allocation to the new executors is FIFO
> for new tasks. This link
> <https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.> does
> some explanation on autoscaling.
>
> Handling Spot Node Loss in Spark Clusters
> When the Spark YARN Application Master (AM) receives the spot loss
> notification from the YARN Resource Manager (RM), it notifies the Spark
> driver. The driver then performs the following actions:
>
>    1. Identifies all the executors affected by the upcoming node loss.
>    2. Moves all of the affected executors to a decommissioning state, and
>    no new tasks are scheduled on these executors.
>    3. Kills all the executors after reaching 50% of the termination time.
>    4. Starts the failed tasks on the remaining executors.
>    5. For these nodes, it removes all the entries of the shuffle data
>    from the map output tracker on driver after reaching 90% of the
>    termination time. This helps in preventing the shuffle-fetch failures due
>    to spot loss.
>    6. Recomputes the shuffle data from the lost node by stage
>    resubmission and at the time shuffles data of the spot node if required.
>
> My conclusion is that when a node fails classic spark comes into play and
> no new nodes are added even with autoscaling enabled and the failed tasks
> are redistributed among the existing executors.
>
>
> Basically autoscaling does not deal with failed nodes?
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>