You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Maksim Grinman <ma...@resolute.ai> on 2022/02/03 21:20:42 UTC

Spark 3.1.2 full thread dumps

We've got a spark task that, after some processing, starts an autoscaling
group and waits for it to be up before continuing processing. While waiting
for the autoscaling group, spark starts throwing full thread dumps,
presumably at the spark.executor.heartbeat interval. Is there a way to
prevent the thread dumps?

-- 
Maksim Grinman
VP Engineering
Resolute AI

Re: Spark 3.1.2 full thread dumps

Posted by Mich Talebzadeh <mi...@gmail.com>.
Indeed. Apologies for going on a tangent.



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 5 Feb 2022 at 01:46, Maksim Grinman <ma...@resolute.ai> wrote:

> Not that this discussion is not interesting (it is), but this has strayed
> pretty far from my original question. Which was: How do I prevent spark
> from dumping huge Java Full Thread dumps when an executor appears to not be
> doing anything (in my case, there's a loop where it sleeps waiting for a
> service to come up). The service happens to be set up using an auto-scaling
> group, a coincidental and unimportant detail that seems to have derailed
> the conversation.
>
> On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> OK basically, do we have a scenario where Spark or for that matter any
>> cluster manager can deploy a new node (after the loss of  an existing node)
>> with the view of running the failed tasks on the new executor(s) deployed
>> on that newly spun node?
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 5 Feb 2022 at 00:00, Holden Karau <ho...@pigscanfly.ca> wrote:
>>
>>> We don’t block scaling up after node failure in classic Spark if that’s
>>> the question.
>>>
>>> On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> From what I can see in auto scaling setup, you will always need a min
>>>> of two worker nodes as primary. It also states and I quote "Scaling
>>>> primary workers is not recommended due to HDFS limitations which result in
>>>> instability while scaling. These limitations do not exist for secondary
>>>> workers". So the scaling comes with the secondary workers specifying the
>>>> min and max instances. It also defaults to 2 minutes for the so-called auto
>>>> scaling cooldown duration hence that delay observed. I presume task
>>>> allocation to the new executors is FIFO for new tasks. This link
>>>> <https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.>
>>>> does some explanation on autoscaling.
>>>>
>>>> Handling Spot Node Loss and Spot Blocks in Spark Clusters
>>>> "When the Spark AM receives the spot loss (Spot Node Loss or Spot
>>>> Blocks) notification from the RM, it notifies the Spark driver. The driver
>>>> then performs the following actions:
>>>>
>>>>    1. Identifies all the executors affected by the upcoming node loss.
>>>>    2. Moves all of the affected executors to a decommissioning state,
>>>>    and no new tasks are scheduled on these executors.
>>>>    3. Kills all the executors after reaching 50% of the termination
>>>>    time.
>>>>    4. *Starts the failed tasks (if any) on other executors.*
>>>>    5. For these nodes, it removes all the entries of the shuffle data
>>>>    from the map output tracker on driver after reaching 90% of the termination
>>>>    time. This helps in preventing the shuffle-fetch failures due to spot loss.
>>>>    6. Recomputes the shuffle data from the lost node by stage
>>>>    resubmission and at the time shuffles data of spot node if required."
>>>>    7.
>>>>    8. So basically when a node fails classic spark comes into play and
>>>>    no new nodes are added etc (no rescaling) and tasks are redistributed among
>>>>    the existing executors as I read it?
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com> wrote:
>>>>
>>>>> I have not seen stack traces under autoscaling, so not even sure what
>>>>> the error in question is.
>>>>> There is always delay in acquiring a whole new executor in the cloud
>>>>> as it usually means a new VM is provisioned.
>>>>> Spark treats the new executor like any other, available for executing
>>>>> tasks.
>>>>>
>>>>> On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the info.
>>>>>>
>>>>>> My concern has always been on how Spark handles autoscaling (adding
>>>>>> new executors) when the load pattern changes.I have tried to test this with
>>>>>> setting the following parameters (Spark 3.1.2 on GCP)
>>>>>>
>>>>>>         spark-submit --verbose \
>>>>>>         .......
>>>>>>           --conf spark.dynamicAllocation.enabled="true" \
>>>>>>            --conf spark.shuffle.service.enabled="true" \
>>>>>>            --conf spark.dynamicAllocation.minExecutors=2 \
>>>>>>            --conf spark.dynamicAllocation.maxExecutors=10 \
>>>>>>            --conf spark.dynamicAllocation.initialExecutors=4 \
>>>>>>
>>>>>> It is not very clear to me how Spark distributes tasks on the added
>>>>>> executors and the source of delay. As you have observed there is a delay in
>>>>>> adding new resources and allocating tasks. If that process is efficient?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai> wrote:
>>>>>>
>>>>>>> It's actually on AWS EMR. The job bootstraps and runs fine -- the
>>>>>>> autoscaling group is to bring up a service that spark will be calling. Some
>>>>>>> code waits for the autoscaling group to come up before continuing
>>>>>>> processing in Spark, since the Spark cluster will need to make requests to
>>>>>>> the service in the autoscaling group. It takes several minutes for the
>>>>>>> service to come up, and during the wait, Spark starts to show these thread
>>>>>>> dumps, as presumably it thinks something is wrong since the executor is
>>>>>>> busy waiting and not doing anything. The previous version of Spark did not
>>>>>>> do this (2.4.4).
>>>>>>>
>>>>>>> On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> Sounds like you are running this on Google Dataproc cluster (spark
>>>>>>>> 3.1.2)  with auto scaling policy?
>>>>>>>>
>>>>>>>>  Can you describe if this happens before Spark starts a new job on
>>>>>>>> the cluster or somehow half way through processing an existing job?
>>>>>>>>
>>>>>>>> Also is the job involved doing Spark Structured Streaming?
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>    view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> We've got a spark task that, after some processing, starts an
>>>>>>>>> autoscaling group and waits for it to be up before continuing processing.
>>>>>>>>> While waiting for the autoscaling group, spark starts throwing full thread
>>>>>>>>> dumps, presumably at the spark.executor.heartbeat interval. Is there a way
>>>>>>>>> to prevent the thread dumps?
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Maksim Grinman
>>>>>>>>> VP Engineering
>>>>>>>>> Resolute AI
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Maksim Grinman
>>>>>>> VP Engineering
>>>>>>> Resolute AI
>>>>>>>
>>>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>> Books (Learning Spark, High Performance Spark, etc.):
>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>
>>
>
> --
> Maksim Grinman
> VP Engineering
> Resolute AI
>

Re: Spark 3.1.2 full thread dumps

Posted by "Lalwani, Jayesh" <jl...@amazon.com.INVALID>.
This (https://www.elastic.co/blog/benchmarking-and-sizing-your-elasticsearch-cluster-for-logs-and-metrics) has the math for sizing the cluster. There is a similar document (https://docs.aws.amazon.com/opensearch-service/latest/developerguide/sizing-domains.html) on sizing your cluster on AWS.

10MB per bulk api call is what elastic search recommends. I’m not sure how they came up with the number

Elastic search gives you the size of each index. I would load 10K records into ES, look at size of index, divide by 10K to roughly get the size of each row. This won’t give you an exact figure, but it will be in the ballpark.

From: Maksim Grinman <ma...@resolute.ai>
Date: Friday, February 11, 2022 at 2:21 PM
To: "Lalwani, Jayesh" <jl...@amazon.com>
Cc: Mich Talebzadeh <mi...@gmail.com>, Holden Karau <ho...@pigscanfly.ca>, Sean Owen <sr...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


Thanks for these suggestions. Regarding hot nodes, are you referring to the same as in this article? https://www.elastic.co/blog/hot-warm-architecture-in-elasticsearch-5-x.
I am also curious where the 10MB heuristic came from, though I have heard a similar heuristic with respect to the size of a partition. I suspect the best way to see the size of a partition is simply to write to parquet and observe the size of the written parquet partitions?

Thanks

On Fri, Feb 11, 2022 at 12:48 PM Lalwani, Jayesh <jl...@amazon.com>> wrote:
You can probably tune writing to elastic search by

  1.  Increasing number of partitions so you are writing smaller batches of rows to elastic search
  2.  Using Elastic search’s bulk api
  3.  Scaling up the number of hot nodes on elastic search cluster to support writing in parallel.

You want to minimize long running tasks. Not just to avoid the “thread dump”. Large number of shorter running tasks are better than Small number of long running tasks, because you can scale up your processing by throwing hardware at it. This is subject to law of diminishing returns; ie; at some point making your tasks smaller will start slowing you down. You need to find the sweet spot.

Generally for elastic search, the sweet spot is that each task writes around 10MB of data using the bulk API. Writing 10MB of data per task should be take order of few seconds. You won’t get the dreaded thread dump if your tasks are taking few seconds

From: Maksim Grinman <ma...@resolute.ai>>
Date: Thursday, February 10, 2022 at 7:21 PM
To: "Lalwani, Jayesh" <jl...@amazon.com>>
Cc: Mich Talebzadeh <mi...@gmail.com>>, Holden Karau <ho...@pigscanfly.ca>>, Sean Owen <sr...@gmail.com>>, "user @spark" <us...@spark.apache.org>>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


That's fair, but I do get the same thread dump at the last step of the spark job, where we write the final dataframe into an elasticsearch index. It's a df.rdd.map(lambda r: r.asDict(True)).foreachPartition operation which takes a while and we usually get a thread dump during that as well.

On Mon, Feb 7, 2022 at 11:24 AM Lalwani, Jayesh <jl...@amazon.com>> wrote:
Probably not the answer you are looking for, but the best thing to do is to avoid making Spark code sleep. Is there a way you can predict how big your autoscaling group needs to be without looking at all the data? Are you using fixed number of Spark executors or are you have some way of scaling your executors? I am guessing that the size of your autoscaling group is proportional to the number of Spark executors. You can probably measure how many executors each can support. Then you can tie in the size of your autoscaling group to the number of executors.

Alternatively, you can build your service so a) it autoscales as load increases b) throttle requests when the load is higher than it can manage now. This means that when Spark executors start hitting your nodes, your service will throttle many of the requests, and start autoscaling up. Note that this is an established pattern in the cloud. This is how most services on AWS work. The end result is that initially there will be higher latency due to cold start, but the system will catch up eventually

From: Maksim Grinman <ma...@resolute.ai>>
Date: Friday, February 4, 2022 at 9:35 PM
To: Mich Talebzadeh <mi...@gmail.com>>
Cc: Holden Karau <ho...@pigscanfly.ca>>, Sean Owen <sr...@gmail.com>>, "user @spark" <us...@spark.apache.org>>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


Not that this discussion is not interesting (it is), but this has strayed pretty far from my original question. Which was: How do I prevent spark from dumping huge Java Full Thread dumps when an executor appears to not be doing anything (in my case, there's a loop where it sleeps waiting for a service to come up). The service happens to be set up using an auto-scaling group, a coincidental and unimportant detail that seems to have derailed the conversation.

On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
OK basically, do we have a scenario where Spark or for that matter any cluster manager can deploy a new node (after the loss of  an existing node) with the view of running the failed tasks on the new executor(s) deployed on that newly spun node?




 Error! Filename not specified.  view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Sat, 5 Feb 2022 at 00:00, Holden Karau <ho...@pigscanfly.ca>> wrote:
We don’t block scaling up after node failure in classic Spark if that’s the question.

On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
From what I can see in auto scaling setup, you will always need a min of two worker nodes as primary. It also states and I quote "Scaling primary workers is not recommended due to HDFS limitations which result in instability while scaling. These limitations do not exist for secondary workers". So the scaling comes with the secondary workers specifying the min and max instances. It also defaults to 2 minutes for the so-called auto scaling cooldown duration hence that delay observed. I presume task allocation to the new executors is FIFO for new tasks. This link<https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.> does some explanation on autoscaling.

Handling Spot Node Loss and Spot Blocks in Spark Clusters
"When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks) notification from the RM, it notifies the Spark driver. The driver then performs the following actions:
1.    Identifies all the executors affected by the upcoming node loss.
2.    Moves all of the affected executors to a decommissioning state, and no new tasks are scheduled on these executors.
3.    Kills all the executors after reaching 50% of the termination time.
4.    Starts the failed tasks (if any) on other executors.
5.    For these nodes, it removes all the entries of the shuffle data from the map output tracker on driver after reaching 90% of the termination time. This helps in preventing the shuffle-fetch failures due to spot loss.
6.    Recomputes the shuffle data from the lost node by stage resubmission and at the time shuffles data of spot node if required."

  1.
  2.  So basically when a node fails classic spark comes into play and no new nodes are added etc (no rescaling) and tasks are redistributed among the existing executors as I read it?
 Error! Filename not specified.  view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com>> wrote:
I have not seen stack traces under autoscaling, so not even sure what the error in question is.
There is always delay in acquiring a whole new executor in the cloud as it usually means a new VM is provisioned.
Spark treats the new executor like any other, available for executing tasks.

On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <mi...@gmail.com>> wrote:
Thanks for the info.

My concern has always been on how Spark handles autoscaling (adding new executors) when the load pattern changes.I have tried to test this with setting the following parameters (Spark 3.1.2 on GCP)

        spark-submit --verbose \
        .......
          --conf spark.dynamicAllocation.enabled="true" \
           --conf spark.shuffle.service.enabled="true" \
           --conf spark.dynamicAllocation.minExecutors=2 \
           --conf spark.dynamicAllocation.maxExecutors=10 \
           --conf spark.dynamicAllocation.initialExecutors=4 \

It is not very clear to me how Spark distributes tasks on the added executors and the source of delay. As you have observed there is a delay in adding new resources and allocating tasks. If that process is efficient?

Thanks

 Error! Filename not specified.  view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai>> wrote:
It's actually on AWS EMR. The job bootstraps and runs fine -- the autoscaling group is to bring up a service that spark will be calling. Some code waits for the autoscaling group to come up before continuing processing in Spark, since the Spark cluster will need to make requests to the service in the autoscaling group. It takes several minutes for the service to come up, and during the wait, Spark starts to show these thread dumps, as presumably it thinks something is wrong since the executor is busy waiting and not doing anything. The previous version of Spark did not do this (2.4.4).

On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
Sounds like you are running this on Google Dataproc cluster (spark 3.1.2)  with auto scaling policy?

 Can you describe if this happens before Spark starts a new job on the cluster or somehow half way through processing an existing job?

Also is the job involved doing Spark Structured Streaming?

HTH




 Error! Filename not specified.  view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai>> wrote:
We've got a spark task that, after some processing, starts an autoscaling group and waits for it to be up before continuing processing. While waiting for the autoscaling group, spark starts throwing full thread dumps, presumably at the spark.executor.heartbeat interval. Is there a way to prevent the thread dumps?

--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI
--
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9>
YouTube Live Streams: https://www.youtube.com/user/holdenkarau


--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI

Re: Spark 3.1.2 full thread dumps

Posted by Maksim Grinman <ma...@resolute.ai>.
Thanks for these suggestions. Regarding hot nodes, are you referring to the
same as in this article?
https://www.elastic.co/blog/hot-warm-architecture-in-elasticsearch-5-x.
I am also curious where the 10MB heuristic came from, though I have heard a
similar heuristic with respect to the size of a partition. I suspect the
best way to see the size of a partition is simply to write to parquet and
observe the size of the written parquet partitions?

Thanks

On Fri, Feb 11, 2022 at 12:48 PM Lalwani, Jayesh <jl...@amazon.com>
wrote:

> You can probably tune writing to elastic search by
>
>    1. Increasing number of partitions so you are writing smaller batches
>    of rows to elastic search
>    2. Using Elastic search’s bulk api
>    3. Scaling up the number of hot nodes on elastic search cluster to
>    support writing in parallel.
>
>
>
> You want to minimize long running tasks. Not just to avoid the “thread
> dump”. Large number of shorter running tasks are better than Small number
> of long running tasks, because you can scale up your processing by throwing
> hardware at it. This is subject to law of diminishing returns; ie; at some
> point making your tasks smaller will start slowing you down. You need to
> find the sweet spot.
>
>
>
> Generally for elastic search, the sweet spot is that each task writes
> around 10MB of data using the bulk API. Writing 10MB of data per task
> should be take order of few seconds. You won’t get the dreaded thread dump
> if your tasks are taking few seconds
>
>
>
> *From: *Maksim Grinman <ma...@resolute.ai>
> *Date: *Thursday, February 10, 2022 at 7:21 PM
> *To: *"Lalwani, Jayesh" <jl...@amazon.com>
> *Cc: *Mich Talebzadeh <mi...@gmail.com>, Holden Karau <
> holden@pigscanfly.ca>, Sean Owen <sr...@gmail.com>, "user @spark" <
> user@spark.apache.org>
> *Subject: *RE: [EXTERNAL] Spark 3.1.2 full thread dumps
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> That's fair, but I do get the same thread dump at the last step of the
> spark job, where we write the final dataframe into an elasticsearch index.
> It's a df.rdd.map(lambda r: r.asDict(True)).foreachPartition operation
> which takes a while and we usually get a thread dump during that as well.
>
>
>
> On Mon, Feb 7, 2022 at 11:24 AM Lalwani, Jayesh <jl...@amazon.com>
> wrote:
>
> Probably not the answer you are looking for, but the best thing to do is
> to avoid making Spark code sleep. Is there a way you can predict how big
> your autoscaling group needs to be without looking at all the data? Are you
> using fixed number of Spark executors or are you have some way of scaling
> your executors? I am guessing that the size of your autoscaling group is
> proportional to the number of Spark executors. You can probably measure how
> many executors each can support. Then you can tie in the size of your
> autoscaling group to the number of executors.
>
>
>
> Alternatively, you can build your service so a) it autoscales as load
> increases b) throttle requests when the load is higher than it can manage
> now. This means that when Spark executors start hitting your nodes, your
> service will throttle many of the requests, and start autoscaling up. Note
> that this is an established pattern in the cloud. This is how most services
> on AWS work. The end result is that initially there will be higher latency
> due to cold start, but the system will catch up eventually
>
>
>
> *From: *Maksim Grinman <ma...@resolute.ai>
> *Date: *Friday, February 4, 2022 at 9:35 PM
> *To: *Mich Talebzadeh <mi...@gmail.com>
> *Cc: *Holden Karau <ho...@pigscanfly.ca>, Sean Owen <sr...@gmail.com>,
> "user @spark" <us...@spark.apache.org>
> *Subject: *RE: [EXTERNAL] Spark 3.1.2 full thread dumps
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Not that this discussion is not interesting (it is), but this has strayed
> pretty far from my original question. Which was: How do I prevent spark
> from dumping huge Java Full Thread dumps when an executor appears to not be
> doing anything (in my case, there's a loop where it sleeps waiting for a
> service to come up). The service happens to be set up using an auto-scaling
> group, a coincidental and unimportant detail that seems to have derailed
> the conversation.
>
>
>
> On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> OK basically, do we have a scenario where Spark or for that matter any
> cluster manager can deploy a new node (after the loss of  an existing node)
> with the view of running the failed tasks on the new executor(s) deployed
> on that newly spun node?
>
>
>
>
>
>  *Error! Filename not specified.*  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Sat, 5 Feb 2022 at 00:00, Holden Karau <ho...@pigscanfly.ca> wrote:
>
> We don’t block scaling up after node failure in classic Spark if that’s
> the question.
>
>
>
> On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> From what I can see in auto scaling setup, you will always need a min of
> two worker nodes as primary. It also states and I quote "Scaling primary
> workers is not recommended due to HDFS limitations which result in
> instability while scaling. These limitations do not exist for secondary
> workers". So the scaling comes with the secondary workers specifying the
> min and max instances. It also defaults to 2 minutes for the so-called auto
> scaling cooldown duration hence that delay observed. I presume task
> allocation to the new executors is FIFO for new tasks. This link
> <https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.>
> does some explanation on autoscaling.
>
>
> Handling Spot Node Loss and Spot Blocks in Spark Clusters
>
> "When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks)
> notification from the RM, it notifies the Spark driver. The driver then
> performs the following actions:
>
> 1.    Identifies all the executors affected by the upcoming node loss.
>
> 2.    Moves all of the affected executors to a decommissioning state, and
> no new tasks are scheduled on these executors.
>
> 3.    Kills all the executors after reaching 50% of the termination time.
>
> 4.    *Starts the failed tasks (if any) on other executors.*
>
> 5.    For these nodes, it removes all the entries of the shuffle data
> from the map output tracker on driver after reaching 90% of the termination
> time. This helps in preventing the shuffle-fetch failures due to spot loss.
>
> 6.    Recomputes the shuffle data from the lost node by stage
> resubmission and at the time shuffles data of spot node if required."
>
>    1.
>    2. So basically when a node fails classic spark comes into play and no
>    new nodes are added etc (no rescaling) and tasks are redistributed among
>    the existing executors as I read it?
>
>  *Error! Filename not specified.*  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com> wrote:
>
> I have not seen stack traces under autoscaling, so not even sure what the
> error in question is.
>
> There is always delay in acquiring a whole new executor in the cloud as it
> usually means a new VM is provisioned.
>
> Spark treats the new executor like any other, available for executing
> tasks.
>
>
>
> On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Thanks for the info.
>
>
>
> My concern has always been on how Spark handles autoscaling (adding new
> executors) when the load pattern changes.I have tried to test this with
> setting the following parameters (Spark 3.1.2 on GCP)
>
>
>
>         spark-submit --verbose \
>
>         .......
>
>           --conf spark.dynamicAllocation.enabled="true" \
>
>            --conf spark.shuffle.service.enabled="true" \
>
>            --conf spark.dynamicAllocation.minExecutors=2 \
>
>            --conf spark.dynamicAllocation.maxExecutors=10 \
>
>            --conf spark.dynamicAllocation.initialExecutors=4 \
>
>
>
> It is not very clear to me how Spark distributes tasks on the added
> executors and the source of delay. As you have observed there is a delay in
> adding new resources and allocating tasks. If that process is efficient?
>
>
>
> Thanks
>
>
>
>  *Error! Filename not specified.*  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai> wrote:
>
> It's actually on AWS EMR. The job bootstraps and runs fine -- the
> autoscaling group is to bring up a service that spark will be calling. Some
> code waits for the autoscaling group to come up before continuing
> processing in Spark, since the Spark cluster will need to make requests to
> the service in the autoscaling group. It takes several minutes for the
> service to come up, and during the wait, Spark starts to show these thread
> dumps, as presumably it thinks something is wrong since the executor is
> busy waiting and not doing anything. The previous version of Spark did not
> do this (2.4.4).
>
>
>
> On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> Sounds like you are running this on Google Dataproc cluster (spark 3.1.2)
> with auto scaling policy?
>
>
>
>  Can you describe if this happens before Spark starts a new job on the
> cluster or somehow half way through processing an existing job?
>
>
>
> Also is the job involved doing Spark Structured Streaming?
>
>
>
> HTH
>
>
>
>
>
>  *Error! Filename not specified.*  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai> wrote:
>
> We've got a spark task that, after some processing, starts an autoscaling
> group and waits for it to be up before continuing processing. While waiting
> for the autoscaling group, spark starts throwing full thread dumps,
> presumably at the spark.executor.heartbeat interval. Is there a way to
> prevent the thread dumps?
>
>
>
> --
>
> Maksim Grinman
> VP Engineering
> Resolute AI
>
>
>
>
> --
>
> Maksim Grinman
> VP Engineering
> Resolute AI
>
> --
>
> Twitter: https://twitter.com/holdenkarau
>
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>
>
>
>
> --
>
> Maksim Grinman
> VP Engineering
> Resolute AI
>
>
>
>
> --
>
> Maksim Grinman
> VP Engineering
> Resolute AI
>


-- 
Maksim Grinman
VP Engineering
Resolute AI

Re: Spark 3.1.2 full thread dumps

Posted by "Lalwani, Jayesh" <jl...@amazon.com.INVALID>.
You can probably tune writing to elastic search by

  1.  Increasing number of partitions so you are writing smaller batches of rows to elastic search
  2.  Using Elastic search’s bulk api
  3.  Scaling up the number of hot nodes on elastic search cluster to support writing in parallel.

You want to minimize long running tasks. Not just to avoid the “thread dump”. Large number of shorter running tasks are better than Small number of long running tasks, because you can scale up your processing by throwing hardware at it. This is subject to law of diminishing returns; ie; at some point making your tasks smaller will start slowing you down. You need to find the sweet spot.

Generally for elastic search, the sweet spot is that each task writes around 10MB of data using the bulk API. Writing 10MB of data per task should be take order of few seconds. You won’t get the dreaded thread dump if your tasks are taking few seconds

From: Maksim Grinman <ma...@resolute.ai>
Date: Thursday, February 10, 2022 at 7:21 PM
To: "Lalwani, Jayesh" <jl...@amazon.com>
Cc: Mich Talebzadeh <mi...@gmail.com>, Holden Karau <ho...@pigscanfly.ca>, Sean Owen <sr...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


That's fair, but I do get the same thread dump at the last step of the spark job, where we write the final dataframe into an elasticsearch index. It's a df.rdd.map(lambda r: r.asDict(True)).foreachPartition operation which takes a while and we usually get a thread dump during that as well.

On Mon, Feb 7, 2022 at 11:24 AM Lalwani, Jayesh <jl...@amazon.com>> wrote:
Probably not the answer you are looking for, but the best thing to do is to avoid making Spark code sleep. Is there a way you can predict how big your autoscaling group needs to be without looking at all the data? Are you using fixed number of Spark executors or are you have some way of scaling your executors? I am guessing that the size of your autoscaling group is proportional to the number of Spark executors. You can probably measure how many executors each can support. Then you can tie in the size of your autoscaling group to the number of executors.

Alternatively, you can build your service so a) it autoscales as load increases b) throttle requests when the load is higher than it can manage now. This means that when Spark executors start hitting your nodes, your service will throttle many of the requests, and start autoscaling up. Note that this is an established pattern in the cloud. This is how most services on AWS work. The end result is that initially there will be higher latency due to cold start, but the system will catch up eventually

From: Maksim Grinman <ma...@resolute.ai>>
Date: Friday, February 4, 2022 at 9:35 PM
To: Mich Talebzadeh <mi...@gmail.com>>
Cc: Holden Karau <ho...@pigscanfly.ca>>, Sean Owen <sr...@gmail.com>>, "user @spark" <us...@spark.apache.org>>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


Not that this discussion is not interesting (it is), but this has strayed pretty far from my original question. Which was: How do I prevent spark from dumping huge Java Full Thread dumps when an executor appears to not be doing anything (in my case, there's a loop where it sleeps waiting for a service to come up). The service happens to be set up using an auto-scaling group, a coincidental and unimportant detail that seems to have derailed the conversation.

On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
OK basically, do we have a scenario where Spark or for that matter any cluster manager can deploy a new node (after the loss of  an existing node) with the view of running the failed tasks on the new executor(s) deployed on that newly spun node?




 Error! Filename not specified.  view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Sat, 5 Feb 2022 at 00:00, Holden Karau <ho...@pigscanfly.ca>> wrote:
We don’t block scaling up after node failure in classic Spark if that’s the question.

On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
From what I can see in auto scaling setup, you will always need a min of two worker nodes as primary. It also states and I quote "Scaling primary workers is not recommended due to HDFS limitations which result in instability while scaling. These limitations do not exist for secondary workers". So the scaling comes with the secondary workers specifying the min and max instances. It also defaults to 2 minutes for the so-called auto scaling cooldown duration hence that delay observed. I presume task allocation to the new executors is FIFO for new tasks. This link<https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.> does some explanation on autoscaling.

Handling Spot Node Loss and Spot Blocks in Spark Clusters
"When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks) notification from the RM, it notifies the Spark driver. The driver then performs the following actions:
1.    Identifies all the executors affected by the upcoming node loss.
2.    Moves all of the affected executors to a decommissioning state, and no new tasks are scheduled on these executors.
3.    Kills all the executors after reaching 50% of the termination time.
4.    Starts the failed tasks (if any) on other executors.
5.    For these nodes, it removes all the entries of the shuffle data from the map output tracker on driver after reaching 90% of the termination time. This helps in preventing the shuffle-fetch failures due to spot loss.
6.    Recomputes the shuffle data from the lost node by stage resubmission and at the time shuffles data of spot node if required."

  1.
  2.  So basically when a node fails classic spark comes into play and no new nodes are added etc (no rescaling) and tasks are redistributed among the existing executors as I read it?
 Error! Filename not specified.  view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com>> wrote:
I have not seen stack traces under autoscaling, so not even sure what the error in question is.
There is always delay in acquiring a whole new executor in the cloud as it usually means a new VM is provisioned.
Spark treats the new executor like any other, available for executing tasks.

On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <mi...@gmail.com>> wrote:
Thanks for the info.

My concern has always been on how Spark handles autoscaling (adding new executors) when the load pattern changes.I have tried to test this with setting the following parameters (Spark 3.1.2 on GCP)

        spark-submit --verbose \
        .......
          --conf spark.dynamicAllocation.enabled="true" \
           --conf spark.shuffle.service.enabled="true" \
           --conf spark.dynamicAllocation.minExecutors=2 \
           --conf spark.dynamicAllocation.maxExecutors=10 \
           --conf spark.dynamicAllocation.initialExecutors=4 \

It is not very clear to me how Spark distributes tasks on the added executors and the source of delay. As you have observed there is a delay in adding new resources and allocating tasks. If that process is efficient?

Thanks

 Error! Filename not specified.  view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai>> wrote:
It's actually on AWS EMR. The job bootstraps and runs fine -- the autoscaling group is to bring up a service that spark will be calling. Some code waits for the autoscaling group to come up before continuing processing in Spark, since the Spark cluster will need to make requests to the service in the autoscaling group. It takes several minutes for the service to come up, and during the wait, Spark starts to show these thread dumps, as presumably it thinks something is wrong since the executor is busy waiting and not doing anything. The previous version of Spark did not do this (2.4.4).

On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
Sounds like you are running this on Google Dataproc cluster (spark 3.1.2)  with auto scaling policy?

 Can you describe if this happens before Spark starts a new job on the cluster or somehow half way through processing an existing job?

Also is the job involved doing Spark Structured Streaming?

HTH




 Error! Filename not specified.  view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai>> wrote:
We've got a spark task that, after some processing, starts an autoscaling group and waits for it to be up before continuing processing. While waiting for the autoscaling group, spark starts throwing full thread dumps, presumably at the spark.executor.heartbeat interval. Is there a way to prevent the thread dumps?

--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI
--
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9>
YouTube Live Streams: https://www.youtube.com/user/holdenkarau


--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI

Re: Spark 3.1.2 full thread dumps

Posted by "Lalwani, Jayesh" <jl...@amazon.com.INVALID>.
Probably not the answer you are looking for, but the best thing to do is to avoid making Spark code sleep. Is there a way you can predict how big your autoscaling group needs to be without looking at all the data? Are you using fixed number of Spark executors or are you have some way of scaling your executors? I am guessing that the size of your autoscaling group is proportional to the number of Spark executors. You can probably measure how many executors each can support. Then you can tie in the size of your autoscaling group to the number of executors.

Alternatively, you can build your service so a) it autoscales as load increases b) throttle requests when the load is higher than it can manage now. This means that when Spark executors start hitting your nodes, your service will throttle many of the requests, and start autoscaling up. Note that this is an established pattern in the cloud. This is how most services on AWS work. The end result is that initially there will be higher latency due to cold start, but the system will catch up eventually

From: Maksim Grinman <ma...@resolute.ai>
Date: Friday, February 4, 2022 at 9:35 PM
To: Mich Talebzadeh <mi...@gmail.com>
Cc: Holden Karau <ho...@pigscanfly.ca>, Sean Owen <sr...@gmail.com>, "user @spark" <us...@spark.apache.org>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


Not that this discussion is not interesting (it is), but this has strayed pretty far from my original question. Which was: How do I prevent spark from dumping huge Java Full Thread dumps when an executor appears to not be doing anything (in my case, there's a loop where it sleeps waiting for a service to come up). The service happens to be set up using an auto-scaling group, a coincidental and unimportant detail that seems to have derailed the conversation.

On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
OK basically, do we have a scenario where Spark or for that matter any cluster manager can deploy a new node (after the loss of  an existing node) with the view of running the failed tasks on the new executor(s) deployed on that newly spun node?




 [Image removed by sender.]   view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Sat, 5 Feb 2022 at 00:00, Holden Karau <ho...@pigscanfly.ca>> wrote:
We don’t block scaling up after node failure in classic Spark if that’s the question.

On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
From what I can see in auto scaling setup, you will always need a min of two worker nodes as primary. It also states and I quote "Scaling primary workers is not recommended due to HDFS limitations which result in instability while scaling. These limitations do not exist for secondary workers". So the scaling comes with the secondary workers specifying the min and max instances. It also defaults to 2 minutes for the so-called auto scaling cooldown duration hence that delay observed. I presume task allocation to the new executors is FIFO for new tasks. This link<https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.> does some explanation on autoscaling.

Handling Spot Node Loss and Spot Blocks in Spark Clusters
"When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks) notification from the RM, it notifies the Spark driver. The driver then performs the following actions:
1.    Identifies all the executors affected by the upcoming node loss.
2.    Moves all of the affected executors to a decommissioning state, and no new tasks are scheduled on these executors.
3.    Kills all the executors after reaching 50% of the termination time.
4.    Starts the failed tasks (if any) on other executors.
5.    For these nodes, it removes all the entries of the shuffle data from the map output tracker on driver after reaching 90% of the termination time. This helps in preventing the shuffle-fetch failures due to spot loss.
6.    Recomputes the shuffle data from the lost node by stage resubmission and at the time shuffles data of spot node if required."

  1.
  2.  So basically when a node fails classic spark comes into play and no new nodes are added etc (no rescaling) and tasks are redistributed among the existing executors as I read it?
 [Image removed by sender.]   view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com>> wrote:
I have not seen stack traces under autoscaling, so not even sure what the error in question is.
There is always delay in acquiring a whole new executor in the cloud as it usually means a new VM is provisioned.
Spark treats the new executor like any other, available for executing tasks.

On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <mi...@gmail.com>> wrote:
Thanks for the info.

My concern has always been on how Spark handles autoscaling (adding new executors) when the load pattern changes.I have tried to test this with setting the following parameters (Spark 3.1.2 on GCP)

        spark-submit --verbose \
        .......
          --conf spark.dynamicAllocation.enabled="true" \
           --conf spark.shuffle.service.enabled="true" \
           --conf spark.dynamicAllocation.minExecutors=2 \
           --conf spark.dynamicAllocation.maxExecutors=10 \
           --conf spark.dynamicAllocation.initialExecutors=4 \

It is not very clear to me how Spark distributes tasks on the added executors and the source of delay. As you have observed there is a delay in adding new resources and allocating tasks. If that process is efficient?

Thanks

 [Image removed by sender.]   view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai>> wrote:
It's actually on AWS EMR. The job bootstraps and runs fine -- the autoscaling group is to bring up a service that spark will be calling. Some code waits for the autoscaling group to come up before continuing processing in Spark, since the Spark cluster will need to make requests to the service in the autoscaling group. It takes several minutes for the service to come up, and during the wait, Spark starts to show these thread dumps, as presumably it thinks something is wrong since the executor is busy waiting and not doing anything. The previous version of Spark did not do this (2.4.4).

On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <mi...@gmail.com>> wrote:
Sounds like you are running this on Google Dataproc cluster (spark 3.1.2)  with auto scaling policy?

 Can you describe if this happens before Spark starts a new job on the cluster or somehow half way through processing an existing job?

Also is the job involved doing Spark Structured Streaming?

HTH




 [Image removed by sender.]   view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.




On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai>> wrote:
We've got a spark task that, after some processing, starts an autoscaling group and waits for it to be up before continuing processing. While waiting for the autoscaling group, spark starts throwing full thread dumps, presumably at the spark.executor.heartbeat interval. Is there a way to prevent the thread dumps?

--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI
--
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9>
YouTube Live Streams: https://www.youtube.com/user/holdenkarau


--
Maksim Grinman
VP Engineering
Resolute AI

Re: Spark 3.1.2 full thread dumps

Posted by Maksim Grinman <ma...@resolute.ai>.
Not that this discussion is not interesting (it is), but this has strayed
pretty far from my original question. Which was: How do I prevent spark
from dumping huge Java Full Thread dumps when an executor appears to not be
doing anything (in my case, there's a loop where it sleeps waiting for a
service to come up). The service happens to be set up using an auto-scaling
group, a coincidental and unimportant detail that seems to have derailed
the conversation.

On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> OK basically, do we have a scenario where Spark or for that matter any
> cluster manager can deploy a new node (after the loss of  an existing node)
> with the view of running the failed tasks on the new executor(s) deployed
> on that newly spun node?
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 5 Feb 2022 at 00:00, Holden Karau <ho...@pigscanfly.ca> wrote:
>
>> We don’t block scaling up after node failure in classic Spark if that’s
>> the question.
>>
>> On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> From what I can see in auto scaling setup, you will always need a min of
>>> two worker nodes as primary. It also states and I quote "Scaling
>>> primary workers is not recommended due to HDFS limitations which result in
>>> instability while scaling. These limitations do not exist for secondary
>>> workers". So the scaling comes with the secondary workers specifying the
>>> min and max instances. It also defaults to 2 minutes for the so-called auto
>>> scaling cooldown duration hence that delay observed. I presume task
>>> allocation to the new executors is FIFO for new tasks. This link
>>> <https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.>
>>> does some explanation on autoscaling.
>>>
>>> Handling Spot Node Loss and Spot Blocks in Spark Clusters
>>> "When the Spark AM receives the spot loss (Spot Node Loss or Spot
>>> Blocks) notification from the RM, it notifies the Spark driver. The driver
>>> then performs the following actions:
>>>
>>>    1. Identifies all the executors affected by the upcoming node loss.
>>>    2. Moves all of the affected executors to a decommissioning state,
>>>    and no new tasks are scheduled on these executors.
>>>    3. Kills all the executors after reaching 50% of the termination
>>>    time.
>>>    4. *Starts the failed tasks (if any) on other executors.*
>>>    5. For these nodes, it removes all the entries of the shuffle data
>>>    from the map output tracker on driver after reaching 90% of the termination
>>>    time. This helps in preventing the shuffle-fetch failures due to spot loss.
>>>    6. Recomputes the shuffle data from the lost node by stage
>>>    resubmission and at the time shuffles data of spot node if required."
>>>    7.
>>>    8. So basically when a node fails classic spark comes into play and
>>>    no new nodes are added etc (no rescaling) and tasks are redistributed among
>>>    the existing executors as I read it?
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com> wrote:
>>>
>>>> I have not seen stack traces under autoscaling, so not even sure what
>>>> the error in question is.
>>>> There is always delay in acquiring a whole new executor in the cloud as
>>>> it usually means a new VM is provisioned.
>>>> Spark treats the new executor like any other, available for executing
>>>> tasks.
>>>>
>>>> On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Thanks for the info.
>>>>>
>>>>> My concern has always been on how Spark handles autoscaling (adding
>>>>> new executors) when the load pattern changes.I have tried to test this with
>>>>> setting the following parameters (Spark 3.1.2 on GCP)
>>>>>
>>>>>         spark-submit --verbose \
>>>>>         .......
>>>>>           --conf spark.dynamicAllocation.enabled="true" \
>>>>>            --conf spark.shuffle.service.enabled="true" \
>>>>>            --conf spark.dynamicAllocation.minExecutors=2 \
>>>>>            --conf spark.dynamicAllocation.maxExecutors=10 \
>>>>>            --conf spark.dynamicAllocation.initialExecutors=4 \
>>>>>
>>>>> It is not very clear to me how Spark distributes tasks on the added
>>>>> executors and the source of delay. As you have observed there is a delay in
>>>>> adding new resources and allocating tasks. If that process is efficient?
>>>>>
>>>>> Thanks
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai> wrote:
>>>>>
>>>>>> It's actually on AWS EMR. The job bootstraps and runs fine -- the
>>>>>> autoscaling group is to bring up a service that spark will be calling. Some
>>>>>> code waits for the autoscaling group to come up before continuing
>>>>>> processing in Spark, since the Spark cluster will need to make requests to
>>>>>> the service in the autoscaling group. It takes several minutes for the
>>>>>> service to come up, and during the wait, Spark starts to show these thread
>>>>>> dumps, as presumably it thinks something is wrong since the executor is
>>>>>> busy waiting and not doing anything. The previous version of Spark did not
>>>>>> do this (2.4.4).
>>>>>>
>>>>>> On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Sounds like you are running this on Google Dataproc cluster (spark
>>>>>>> 3.1.2)  with auto scaling policy?
>>>>>>>
>>>>>>>  Can you describe if this happens before Spark starts a new job on
>>>>>>> the cluster or somehow half way through processing an existing job?
>>>>>>>
>>>>>>> Also is the job involved doing Spark Structured Streaming?
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> We've got a spark task that, after some processing, starts an
>>>>>>>> autoscaling group and waits for it to be up before continuing processing.
>>>>>>>> While waiting for the autoscaling group, spark starts throwing full thread
>>>>>>>> dumps, presumably at the spark.executor.heartbeat interval. Is there a way
>>>>>>>> to prevent the thread dumps?
>>>>>>>>
>>>>>>>> --
>>>>>>>> Maksim Grinman
>>>>>>>> VP Engineering
>>>>>>>> Resolute AI
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Maksim Grinman
>>>>>> VP Engineering
>>>>>> Resolute AI
>>>>>>
>>>>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>

-- 
Maksim Grinman
VP Engineering
Resolute AI

Re: Spark 3.1.2 full thread dumps

Posted by Mich Talebzadeh <mi...@gmail.com>.
OK basically, do we have a scenario where Spark or for that matter any
cluster manager can deploy a new node (after the loss of  an existing node)
with the view of running the failed tasks on the new executor(s) deployed
on that newly spun node?



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 5 Feb 2022 at 00:00, Holden Karau <ho...@pigscanfly.ca> wrote:

> We don’t block scaling up after node failure in classic Spark if that’s
> the question.
>
> On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> From what I can see in auto scaling setup, you will always need a min of
>> two worker nodes as primary. It also states and I quote "Scaling primary
>> workers is not recommended due to HDFS limitations which result in
>> instability while scaling. These limitations do not exist for secondary
>> workers". So the scaling comes with the secondary workers specifying the
>> min and max instances. It also defaults to 2 minutes for the so-called auto
>> scaling cooldown duration hence that delay observed. I presume task
>> allocation to the new executors is FIFO for new tasks. This link
>> <https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.>
>> does some explanation on autoscaling.
>>
>> Handling Spot Node Loss and Spot Blocks in Spark Clusters
>> "When the Spark AM receives the spot loss (Spot Node Loss or Spot
>> Blocks) notification from the RM, it notifies the Spark driver. The driver
>> then performs the following actions:
>>
>>    1. Identifies all the executors affected by the upcoming node loss.
>>    2. Moves all of the affected executors to a decommissioning state,
>>    and no new tasks are scheduled on these executors.
>>    3. Kills all the executors after reaching 50% of the termination time.
>>    4. *Starts the failed tasks (if any) on other executors.*
>>    5. For these nodes, it removes all the entries of the shuffle data
>>    from the map output tracker on driver after reaching 90% of the termination
>>    time. This helps in preventing the shuffle-fetch failures due to spot loss.
>>    6. Recomputes the shuffle data from the lost node by stage
>>    resubmission and at the time shuffles data of spot node if required."
>>    7.
>>    8. So basically when a node fails classic spark comes into play and
>>    no new nodes are added etc (no rescaling) and tasks are redistributed among
>>    the existing executors as I read it?
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com> wrote:
>>
>>> I have not seen stack traces under autoscaling, so not even sure what
>>> the error in question is.
>>> There is always delay in acquiring a whole new executor in the cloud as
>>> it usually means a new VM is provisioned.
>>> Spark treats the new executor like any other, available for executing
>>> tasks.
>>>
>>> On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Thanks for the info.
>>>>
>>>> My concern has always been on how Spark handles autoscaling (adding new
>>>> executors) when the load pattern changes.I have tried to test this with
>>>> setting the following parameters (Spark 3.1.2 on GCP)
>>>>
>>>>         spark-submit --verbose \
>>>>         .......
>>>>           --conf spark.dynamicAllocation.enabled="true" \
>>>>            --conf spark.shuffle.service.enabled="true" \
>>>>            --conf spark.dynamicAllocation.minExecutors=2 \
>>>>            --conf spark.dynamicAllocation.maxExecutors=10 \
>>>>            --conf spark.dynamicAllocation.initialExecutors=4 \
>>>>
>>>> It is not very clear to me how Spark distributes tasks on the added
>>>> executors and the source of delay. As you have observed there is a delay in
>>>> adding new resources and allocating tasks. If that process is efficient?
>>>>
>>>> Thanks
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai> wrote:
>>>>
>>>>> It's actually on AWS EMR. The job bootstraps and runs fine -- the
>>>>> autoscaling group is to bring up a service that spark will be calling. Some
>>>>> code waits for the autoscaling group to come up before continuing
>>>>> processing in Spark, since the Spark cluster will need to make requests to
>>>>> the service in the autoscaling group. It takes several minutes for the
>>>>> service to come up, and during the wait, Spark starts to show these thread
>>>>> dumps, as presumably it thinks something is wrong since the executor is
>>>>> busy waiting and not doing anything. The previous version of Spark did not
>>>>> do this (2.4.4).
>>>>>
>>>>> On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Sounds like you are running this on Google Dataproc cluster (spark
>>>>>> 3.1.2)  with auto scaling policy?
>>>>>>
>>>>>>  Can you describe if this happens before Spark starts a new job on
>>>>>> the cluster or somehow half way through processing an existing job?
>>>>>>
>>>>>> Also is the job involved doing Spark Structured Streaming?
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai> wrote:
>>>>>>
>>>>>>> We've got a spark task that, after some processing, starts an
>>>>>>> autoscaling group and waits for it to be up before continuing processing.
>>>>>>> While waiting for the autoscaling group, spark starts throwing full thread
>>>>>>> dumps, presumably at the spark.executor.heartbeat interval. Is there a way
>>>>>>> to prevent the thread dumps?
>>>>>>>
>>>>>>> --
>>>>>>> Maksim Grinman
>>>>>>> VP Engineering
>>>>>>> Resolute AI
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Maksim Grinman
>>>>> VP Engineering
>>>>> Resolute AI
>>>>>
>>>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>

Re: Spark 3.1.2 full thread dumps

Posted by Holden Karau <ho...@pigscanfly.ca>.
We don’t block scaling up after node failure in classic Spark if that’s the
question.

On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> From what I can see in auto scaling setup, you will always need a min of
> two worker nodes as primary. It also states and I quote "Scaling primary
> workers is not recommended due to HDFS limitations which result in
> instability while scaling. These limitations do not exist for secondary
> workers". So the scaling comes with the secondary workers specifying the
> min and max instances. It also defaults to 2 minutes for the so-called auto
> scaling cooldown duration hence that delay observed. I presume task
> allocation to the new executors is FIFO for new tasks. This link
> <https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.>
> does some explanation on autoscaling.
>
> Handling Spot Node Loss and Spot Blocks in Spark Clusters
> "When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks)
> notification from the RM, it notifies the Spark driver. The driver then
> performs the following actions:
>
>    1. Identifies all the executors affected by the upcoming node loss.
>    2. Moves all of the affected executors to a decommissioning state, and
>    no new tasks are scheduled on these executors.
>    3. Kills all the executors after reaching 50% of the termination time.
>    4. *Starts the failed tasks (if any) on other executors.*
>    5. For these nodes, it removes all the entries of the shuffle data
>    from the map output tracker on driver after reaching 90% of the termination
>    time. This helps in preventing the shuffle-fetch failures due to spot loss.
>    6. Recomputes the shuffle data from the lost node by stage
>    resubmission and at the time shuffles data of spot node if required."
>    7.
>    8. So basically when a node fails classic spark comes into play and no
>    new nodes are added etc (no rescaling) and tasks are redistributed among
>    the existing executors as I read it?
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com> wrote:
>
>> I have not seen stack traces under autoscaling, so not even sure what the
>> error in question is.
>> There is always delay in acquiring a whole new executor in the cloud as
>> it usually means a new VM is provisioned.
>> Spark treats the new executor like any other, available for executing
>> tasks.
>>
>> On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Thanks for the info.
>>>
>>> My concern has always been on how Spark handles autoscaling (adding new
>>> executors) when the load pattern changes.I have tried to test this with
>>> setting the following parameters (Spark 3.1.2 on GCP)
>>>
>>>         spark-submit --verbose \
>>>         .......
>>>           --conf spark.dynamicAllocation.enabled="true" \
>>>            --conf spark.shuffle.service.enabled="true" \
>>>            --conf spark.dynamicAllocation.minExecutors=2 \
>>>            --conf spark.dynamicAllocation.maxExecutors=10 \
>>>            --conf spark.dynamicAllocation.initialExecutors=4 \
>>>
>>> It is not very clear to me how Spark distributes tasks on the added
>>> executors and the source of delay. As you have observed there is a delay in
>>> adding new resources and allocating tasks. If that process is efficient?
>>>
>>> Thanks
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai> wrote:
>>>
>>>> It's actually on AWS EMR. The job bootstraps and runs fine -- the
>>>> autoscaling group is to bring up a service that spark will be calling. Some
>>>> code waits for the autoscaling group to come up before continuing
>>>> processing in Spark, since the Spark cluster will need to make requests to
>>>> the service in the autoscaling group. It takes several minutes for the
>>>> service to come up, and during the wait, Spark starts to show these thread
>>>> dumps, as presumably it thinks something is wrong since the executor is
>>>> busy waiting and not doing anything. The previous version of Spark did not
>>>> do this (2.4.4).
>>>>
>>>> On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Sounds like you are running this on Google Dataproc cluster (spark
>>>>> 3.1.2)  with auto scaling policy?
>>>>>
>>>>>  Can you describe if this happens before Spark starts a new job on the
>>>>> cluster or somehow half way through processing an existing job?
>>>>>
>>>>> Also is the job involved doing Spark Structured Streaming?
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai> wrote:
>>>>>
>>>>>> We've got a spark task that, after some processing, starts an
>>>>>> autoscaling group and waits for it to be up before continuing processing.
>>>>>> While waiting for the autoscaling group, spark starts throwing full thread
>>>>>> dumps, presumably at the spark.executor.heartbeat interval. Is there a way
>>>>>> to prevent the thread dumps?
>>>>>>
>>>>>> --
>>>>>> Maksim Grinman
>>>>>> VP Engineering
>>>>>> Resolute AI
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Maksim Grinman
>>>> VP Engineering
>>>> Resolute AI
>>>>
>>> --
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.):
https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
YouTube Live Streams: https://www.youtube.com/user/holdenkarau

Re: Spark 3.1.2 full thread dumps

Posted by Mich Talebzadeh <mi...@gmail.com>.
From what I can see in auto scaling setup, you will always need a min of
two worker nodes as primary. It also states and I quote "Scaling primary
workers is not recommended due to HDFS limitations which result in
instability while scaling. These limitations do not exist for secondary
workers". So the scaling comes with the secondary workers specifying the
min and max instances. It also defaults to 2 minutes for the so-called auto
scaling cooldown duration hence that delay observed. I presume task
allocation to the new executors is FIFO for new tasks. This link
<https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.>
does some explanation on autoscaling.

Handling Spot Node Loss and Spot Blocks in Spark Clusters
"When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks)
notification from the RM, it notifies the Spark driver. The driver then
performs the following actions:

   1. Identifies all the executors affected by the upcoming node loss.
   2. Moves all of the affected executors to a decommissioning state, and
   no new tasks are scheduled on these executors.
   3. Kills all the executors after reaching 50% of the termination time.
   4. *Starts the failed tasks (if any) on other executors.*
   5. For these nodes, it removes all the entries of the shuffle data from
   the map output tracker on driver after reaching 90% of the termination
   time. This helps in preventing the shuffle-fetch failures due to spot loss.
   6. Recomputes the shuffle data from the lost node by stage resubmission
   and at the time shuffles data of spot node if required."
   7.
   8. So basically when a node fails classic spark comes into play and no
   new nodes are added etc (no rescaling) and tasks are redistributed among
   the existing executors as I read it?

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 4 Feb 2022 at 13:55, Sean Owen <sr...@gmail.com> wrote:

> I have not seen stack traces under autoscaling, so not even sure what the
> error in question is.
> There is always delay in acquiring a whole new executor in the cloud as it
> usually means a new VM is provisioned.
> Spark treats the new executor like any other, available for executing
> tasks.
>
> On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks for the info.
>>
>> My concern has always been on how Spark handles autoscaling (adding new
>> executors) when the load pattern changes.I have tried to test this with
>> setting the following parameters (Spark 3.1.2 on GCP)
>>
>>         spark-submit --verbose \
>>         .......
>>           --conf spark.dynamicAllocation.enabled="true" \
>>            --conf spark.shuffle.service.enabled="true" \
>>            --conf spark.dynamicAllocation.minExecutors=2 \
>>            --conf spark.dynamicAllocation.maxExecutors=10 \
>>            --conf spark.dynamicAllocation.initialExecutors=4 \
>>
>> It is not very clear to me how Spark distributes tasks on the added
>> executors and the source of delay. As you have observed there is a delay in
>> adding new resources and allocating tasks. If that process is efficient?
>>
>> Thanks
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai> wrote:
>>
>>> It's actually on AWS EMR. The job bootstraps and runs fine -- the
>>> autoscaling group is to bring up a service that spark will be calling. Some
>>> code waits for the autoscaling group to come up before continuing
>>> processing in Spark, since the Spark cluster will need to make requests to
>>> the service in the autoscaling group. It takes several minutes for the
>>> service to come up, and during the wait, Spark starts to show these thread
>>> dumps, as presumably it thinks something is wrong since the executor is
>>> busy waiting and not doing anything. The previous version of Spark did not
>>> do this (2.4.4).
>>>
>>> On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Sounds like you are running this on Google Dataproc cluster (spark
>>>> 3.1.2)  with auto scaling policy?
>>>>
>>>>  Can you describe if this happens before Spark starts a new job on the
>>>> cluster or somehow half way through processing an existing job?
>>>>
>>>> Also is the job involved doing Spark Structured Streaming?
>>>>
>>>> HTH
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai> wrote:
>>>>
>>>>> We've got a spark task that, after some processing, starts an
>>>>> autoscaling group and waits for it to be up before continuing processing.
>>>>> While waiting for the autoscaling group, spark starts throwing full thread
>>>>> dumps, presumably at the spark.executor.heartbeat interval. Is there a way
>>>>> to prevent the thread dumps?
>>>>>
>>>>> --
>>>>> Maksim Grinman
>>>>> VP Engineering
>>>>> Resolute AI
>>>>>
>>>>
>>>
>>> --
>>> Maksim Grinman
>>> VP Engineering
>>> Resolute AI
>>>
>>

Re: Spark 3.1.2 full thread dumps

Posted by Sean Owen <sr...@gmail.com>.
I have not seen stack traces under autoscaling, so not even sure what the
error in question is.
There is always delay in acquiring a whole new executor in the cloud as it
usually means a new VM is provisioned.
Spark treats the new executor like any other, available for executing tasks.

On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Thanks for the info.
>
> My concern has always been on how Spark handles autoscaling (adding new
> executors) when the load pattern changes.I have tried to test this with
> setting the following parameters (Spark 3.1.2 on GCP)
>
>         spark-submit --verbose \
>         .......
>           --conf spark.dynamicAllocation.enabled="true" \
>            --conf spark.shuffle.service.enabled="true" \
>            --conf spark.dynamicAllocation.minExecutors=2 \
>            --conf spark.dynamicAllocation.maxExecutors=10 \
>            --conf spark.dynamicAllocation.initialExecutors=4 \
>
> It is not very clear to me how Spark distributes tasks on the added
> executors and the source of delay. As you have observed there is a delay in
> adding new resources and allocating tasks. If that process is efficient?
>
> Thanks
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai> wrote:
>
>> It's actually on AWS EMR. The job bootstraps and runs fine -- the
>> autoscaling group is to bring up a service that spark will be calling. Some
>> code waits for the autoscaling group to come up before continuing
>> processing in Spark, since the Spark cluster will need to make requests to
>> the service in the autoscaling group. It takes several minutes for the
>> service to come up, and during the wait, Spark starts to show these thread
>> dumps, as presumably it thinks something is wrong since the executor is
>> busy waiting and not doing anything. The previous version of Spark did not
>> do this (2.4.4).
>>
>> On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Sounds like you are running this on Google Dataproc cluster (spark
>>> 3.1.2)  with auto scaling policy?
>>>
>>>  Can you describe if this happens before Spark starts a new job on the
>>> cluster or somehow half way through processing an existing job?
>>>
>>> Also is the job involved doing Spark Structured Streaming?
>>>
>>> HTH
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai> wrote:
>>>
>>>> We've got a spark task that, after some processing, starts an
>>>> autoscaling group and waits for it to be up before continuing processing.
>>>> While waiting for the autoscaling group, spark starts throwing full thread
>>>> dumps, presumably at the spark.executor.heartbeat interval. Is there a way
>>>> to prevent the thread dumps?
>>>>
>>>> --
>>>> Maksim Grinman
>>>> VP Engineering
>>>> Resolute AI
>>>>
>>>
>>
>> --
>> Maksim Grinman
>> VP Engineering
>> Resolute AI
>>
>

Re: Spark 3.1.2 full thread dumps

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks for the info.

My concern has always been on how Spark handles autoscaling (adding new
executors) when the load pattern changes.I have tried to test this with
setting the following parameters (Spark 3.1.2 on GCP)

        spark-submit --verbose \
        .......
          --conf spark.dynamicAllocation.enabled="true" \
           --conf spark.shuffle.service.enabled="true" \
           --conf spark.dynamicAllocation.minExecutors=2 \
           --conf spark.dynamicAllocation.maxExecutors=10 \
           --conf spark.dynamicAllocation.initialExecutors=4 \

It is not very clear to me how Spark distributes tasks on the added
executors and the source of delay. As you have observed there is a delay in
adding new resources and allocating tasks. If that process is efficient?

Thanks

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 4 Feb 2022 at 03:04, Maksim Grinman <ma...@resolute.ai> wrote:

> It's actually on AWS EMR. The job bootstraps and runs fine -- the
> autoscaling group is to bring up a service that spark will be calling. Some
> code waits for the autoscaling group to come up before continuing
> processing in Spark, since the Spark cluster will need to make requests to
> the service in the autoscaling group. It takes several minutes for the
> service to come up, and during the wait, Spark starts to show these thread
> dumps, as presumably it thinks something is wrong since the executor is
> busy waiting and not doing anything. The previous version of Spark did not
> do this (2.4.4).
>
> On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Sounds like you are running this on Google Dataproc cluster (spark
>> 3.1.2)  with auto scaling policy?
>>
>>  Can you describe if this happens before Spark starts a new job on the
>> cluster or somehow half way through processing an existing job?
>>
>> Also is the job involved doing Spark Structured Streaming?
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai> wrote:
>>
>>> We've got a spark task that, after some processing, starts an
>>> autoscaling group and waits for it to be up before continuing processing.
>>> While waiting for the autoscaling group, spark starts throwing full thread
>>> dumps, presumably at the spark.executor.heartbeat interval. Is there a way
>>> to prevent the thread dumps?
>>>
>>> --
>>> Maksim Grinman
>>> VP Engineering
>>> Resolute AI
>>>
>>
>
> --
> Maksim Grinman
> VP Engineering
> Resolute AI
>

Re: Spark 3.1.2 full thread dumps

Posted by Maksim Grinman <ma...@resolute.ai>.
It's actually on AWS EMR. The job bootstraps and runs fine -- the
autoscaling group is to bring up a service that spark will be calling. Some
code waits for the autoscaling group to come up before continuing
processing in Spark, since the Spark cluster will need to make requests to
the service in the autoscaling group. It takes several minutes for the
service to come up, and during the wait, Spark starts to show these thread
dumps, as presumably it thinks something is wrong since the executor is
busy waiting and not doing anything. The previous version of Spark did not
do this (2.4.4).

On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Sounds like you are running this on Google Dataproc cluster (spark 3.1.2)
> with auto scaling policy?
>
>  Can you describe if this happens before Spark starts a new job on the
> cluster or somehow half way through processing an existing job?
>
> Also is the job involved doing Spark Structured Streaming?
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai> wrote:
>
>> We've got a spark task that, after some processing, starts an autoscaling
>> group and waits for it to be up before continuing processing. While waiting
>> for the autoscaling group, spark starts throwing full thread dumps,
>> presumably at the spark.executor.heartbeat interval. Is there a way to
>> prevent the thread dumps?
>>
>> --
>> Maksim Grinman
>> VP Engineering
>> Resolute AI
>>
>

-- 
Maksim Grinman
VP Engineering
Resolute AI

Re: Spark 3.1.2 full thread dumps

Posted by Mich Talebzadeh <mi...@gmail.com>.
Sounds like you are running this on Google Dataproc cluster (spark 3.1.2)
with auto scaling policy?

 Can you describe if this happens before Spark starts a new job on the
cluster or somehow half way through processing an existing job?

Also is the job involved doing Spark Structured Streaming?

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 3 Feb 2022 at 21:29, Maksim Grinman <ma...@resolute.ai> wrote:

> We've got a spark task that, after some processing, starts an autoscaling
> group and waits for it to be up before continuing processing. While waiting
> for the autoscaling group, spark starts throwing full thread dumps,
> presumably at the spark.executor.heartbeat interval. Is there a way to
> prevent the thread dumps?
>
> --
> Maksim Grinman
> VP Engineering
> Resolute AI
>