You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Igor Calabria <ig...@gmail.com> on 2022/09/29 18:12:07 UTC

Help with Shuffle Read performance

Hi Everyone,

I'm running spark 3.2 on kubernetes and have a job with a decently sized
shuffle of almost 4TB. The relevant cluster config is as follows:

- 30 Executors. 16 physical cores, configured with 32 Cores for spark
- 128 GB RAM
-  shuffle.partitions is 18k which gives me tasks of around 150~180MB

The job runs fine but I'm bothered by how underutilized the cluster gets
during the reduce phase. During the map(reading data from s3 and writing
the shuffle data) CPU usage, disk throughput and network usage is as
expected, but during the reduce phase it gets really low. It seems the main
bottleneck is reading shuffle data from other nodes, task statistics
reports values ranging from 25s to several minutes(the task sizes are
really close, they aren't skewed). I've tried increasing
"spark.reducer.maxSizeInFlight" and
"spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
a little, but not enough to saturate the cluster resources.

Did I miss some more tuning parameters that could help?
One obvious thing would be to vertically increase the machines and use less
nodes to minimize traffic, but 30 nodes doesn't seem like much even
considering 30x30 connections.

Thanks in advance!

Re: Help with Shuffle Read performance

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Leszek,

spot on,  therefore EMR being created and dynamically scaled up and down
and being ephemeral proves that there is actually no advantage of using
containers for large jobs.

It is utterly pointless and I have attended interviews and workshops where
no one has ever been able to prove its value inplace of EMR's.

That said, if you have a large system, and want to use two different
containers in it for two different jobs with different binaries in them,
then container is the way to go. Also it may be useful jobs which requires
 low memory and disk space and requires low latency response time like real
time processing - but in that case why use SPARK and not something like
Kafka SQL, etc.

So this entire containers nonsense is utterly pointless, and useless from
everyway other than devops engineers making pointless arguments to create
and spend huge budget and money as very few organisations need low latency
variable throughput multiple binary hosting systems


Regards,
Gourav

On Fri, Sep 30, 2022 at 5:32 AM Leszek Reimus <le...@gmail.com>
wrote:

> Hi Everyone,
>
> To add my 2 cents here:
>
> Advantage of containers, to me, is that it leaves the host system pristine
> and clean, allowing standardized devops deployment of hardware for any
> purpose. Way back before - when using bare metal / ansible, reusing hw
> always involved full reformat of base system. This alone is worth the ~1-2%
> performance tax cgroup containers have.
>
> Advantage of kubernetes is more on the deployment side of things. Unified
> deployment scripts that can be written by devs. Same deployment yaml (or
> helm chart) can be used on local Dev Env / QA / Integration Env and finally
> Prod (with some tweaks).
>
> Depending on the networking CNI, and storage backend - Kubernetes can have
> a very close to bare metal performance. In the end it is always a
> trade-off. You gain some, you pay with extra overhead.
>
> I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
> legacy MapReduce jobs too though) . Finding it much more manageable to
> allocate larger memory/cpu chunks to yarn pods and then have run
> auto-scaler to scale out YARN if needed; than to manage individual
> memory/cpu requirements on Spark on Kubernetes deployment.
>
> As far as I tested, Spark on Kubernetes is immature when reliability is
> concerned (or maybe our homegrown k8s does not do fencing/STONITH well
> yet). When a node dies / goes down, I find executors not getting
> rescheduled to other nodes - the driver just gets stuck for the executors
> to come back. This does not happen on YARN / Standalone deployment (even
> when ran on same k8s cluster)
>
> Sincerely,
>
> Leszek Reimus
>
>
>
>
> On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta <go...@gmail.com>
> wrote:
>
>> Hi,
>>
>> dont containers finally run on systems, and the only advantage of
>> containers is that you can do better utilisation of system resources by
>> micro management of jobs running in it? Some say that containers have their
>> own binaries which isolates environment, but that is a lie, because in a
>> kubernetes environments that is running your SPARK jobs you will have the
>> same environment for all your kubes.
>>
>> And as you can see there are several other configurations, disk mounting,
>> security, etc issues to handle as an overhead as well.
>>
>> And the entire goal of all those added configurations is that someone in
>> your devops team feels using containers makes things more interesting
>> without any real added advantage to large volume jobs.
>>
>> But I may be wrong, and perhaps we need data, and not personal attacks
>> like the other person in the thread did.
>>
>> In case anyone does not know EMR does run on containers as well, and in
>> EMR running on EC2 nodes you can put all your binaries in containers and
>> use those for running your jobs.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus <vl...@gmail.com>
>> wrote:
>>
>>> Igor,
>>>
>>> what exact instance types do you use? Unless you use local instance
>>> storage and have actually configured your Kubernetes and Spark to use
>>> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
>>> investigate that by going to an instance, then to volume, and see
>>> monitoring charts.
>>>
>>> Another thought is that you're essentially giving 4GB per core. That
>>> sounds pretty low, in my experience.
>>>
>>>
>>>
>>> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria <ig...@gmail.com>
>>> wrote:
>>>
>>>> Hi Everyone,
>>>>
>>>> I'm running spark 3.2 on kubernetes and have a job with a decently
>>>> sized shuffle of almost 4TB. The relevant cluster config is as follows:
>>>>
>>>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>>>> - 128 GB RAM
>>>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>>>
>>>> The job runs fine but I'm bothered by how underutilized the cluster
>>>> gets during the reduce phase. During the map(reading data from s3 and
>>>> writing the shuffle data) CPU usage, disk throughput and network usage is
>>>> as expected, but during the reduce phase it gets really low. It seems the
>>>> main bottleneck is reading shuffle data from other nodes, task statistics
>>>> reports values ranging from 25s to several minutes(the task sizes are
>>>> really close, they aren't skewed). I've tried increasing
>>>> "spark.reducer.maxSizeInFlight" and
>>>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>>>> a little, but not enough to saturate the cluster resources.
>>>>
>>>> Did I miss some more tuning parameters that could help?
>>>> One obvious thing would be to vertically increase the machines and use
>>>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>>>> considering 30x30 connections.
>>>>
>>>> Thanks in advance!
>>>>
>>>>
>>>
>>> --
>>> Vladimir Prus
>>> http://vladimirprus.com
>>>
>>
>
> --
> --------------
> "It is the common fate of the indolent to see their rights become a prey
> to the active. The condition upon which God hath given liberty to man is
> eternal vigilance; which condition if he break, servitude is at once the
> consequence of his crime and the punishment of his guilt." - John Philpot
> Curran: Speech upon the Right of Election, 1790.
>

Re: Help with Shuffle Read performance

Posted by Leszek Reimus <le...@gmail.com>.
Hi Sungwoo,

I tend to agree - for a new system, I would probably not go that route, as
Spark on Kubernetes is getting there and can do a lot already. Issue I
mentioned before can be fixed with proper node fencing - it is a typical
stateful set problem Kubernetes has without fencing - node goes down, k8s
refuses to relocate stateful set pod because it did not get confirmation
that old pod was shut down or not, since lack of communication != shut down
- it can be split brain too. Since it operates on "at most one" principle -
it will not allow 2nd pod up.

But keeping the 2nd layer of resource abstraction has its pluses as well.
From k8s perspective you allocate memory/cpu as a block to the entire
YARN/Spark, making the allocation easier. In the end - ability to support
Spark 1.6, MR, Spark 2.4 and Spark 3.1 jobs in one system is what made it
for us.

It also makes it easier when you want full data locality with shortcut HDFS
reads. Since as far as I know Spark on Kubernetes stopped talking about
that - and in my experience data locality can speed up Spark by large
factor.

Sincerely,

Leszek Reimus




On Fri, Sep 30, 2022 at 2:25 AM Sungwoo Park <gl...@gmail.com> wrote:

> Hi Leszek,
>
> For running YARN on Kubernetes and then running Spark on YARN, is there a
> lot of overhead for maintaining YARN on Kubernetes? I thought people
> usually want to move from YARN to Kubernetes because of the overhead of
> maintaining Hadoop.
>
> Thanks,
>
> --- Sungwoo
>
>
> On Fri, Sep 30, 2022 at 1:37 PM Leszek Reimus <le...@gmail.com>
> wrote:
>
>> Hi Everyone,
>>
>> To add my 2 cents here:
>>
>> Advantage of containers, to me, is that it leaves the host system
>> pristine and clean, allowing standardized devops deployment of hardware for
>> any purpose. Way back before - when using bare metal / ansible, reusing hw
>> always involved full reformat of base system. This alone is worth the ~1-2%
>> performance tax cgroup containers have.
>>
>> Advantage of kubernetes is more on the deployment side of things. Unified
>> deployment scripts that can be written by devs. Same deployment yaml (or
>> helm chart) can be used on local Dev Env / QA / Integration Env and finally
>> Prod (with some tweaks).
>>
>> Depending on the networking CNI, and storage backend - Kubernetes can
>> have a very close to bare metal performance. In the end it is always a
>> trade-off. You gain some, you pay with extra overhead.
>>
>> I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
>> legacy MapReduce jobs too though) . Finding it much more manageable to
>> allocate larger memory/cpu chunks to yarn pods and then have run
>> auto-scaler to scale out YARN if needed; than to manage individual
>> memory/cpu requirements on Spark on Kubernetes deployment.
>>
>> As far as I tested, Spark on Kubernetes is immature when reliability is
>> concerned (or maybe our homegrown k8s does not do fencing/STONITH well
>> yet). When a node dies / goes down, I find executors not getting
>> rescheduled to other nodes - the driver just gets stuck for the executors
>> to come back. This does not happen on YARN / Standalone deployment (even
>> when ran on same k8s cluster)
>>
>> Sincerely,
>>
>> Leszek Reimus
>>
>>
>>
>>
>> On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> dont containers finally run on systems, and the only advantage of
>>> containers is that you can do better utilisation of system resources by
>>> micro management of jobs running in it? Some say that containers have their
>>> own binaries which isolates environment, but that is a lie, because in a
>>> kubernetes environments that is running your SPARK jobs you will have the
>>> same environment for all your kubes.
>>>
>>> And as you can see there are several other configurations, disk
>>> mounting, security, etc issues to handle as an overhead as well.
>>>
>>> And the entire goal of all those added configurations is that someone in
>>> your devops team feels using containers makes things more interesting
>>> without any real added advantage to large volume jobs.
>>>
>>> But I may be wrong, and perhaps we need data, and not personal attacks
>>> like the other person in the thread did.
>>>
>>> In case anyone does not know EMR does run on containers as well, and in
>>> EMR running on EC2 nodes you can put all your binaries in containers and
>>> use those for running your jobs.
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus <vl...@gmail.com>
>>> wrote:
>>>
>>>> Igor,
>>>>
>>>> what exact instance types do you use? Unless you use local instance
>>>> storage and have actually configured your Kubernetes and Spark to use
>>>> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
>>>> investigate that by going to an instance, then to volume, and see
>>>> monitoring charts.
>>>>
>>>> Another thought is that you're essentially giving 4GB per core. That
>>>> sounds pretty low, in my experience.
>>>>
>>>>
>>>>
>>>> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria <ig...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Everyone,
>>>>>
>>>>> I'm running spark 3.2 on kubernetes and have a job with a decently
>>>>> sized shuffle of almost 4TB. The relevant cluster config is as follows:
>>>>>
>>>>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>>>>> - 128 GB RAM
>>>>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>>>>
>>>>> The job runs fine but I'm bothered by how underutilized the cluster
>>>>> gets during the reduce phase. During the map(reading data from s3 and
>>>>> writing the shuffle data) CPU usage, disk throughput and network usage is
>>>>> as expected, but during the reduce phase it gets really low. It seems the
>>>>> main bottleneck is reading shuffle data from other nodes, task statistics
>>>>> reports values ranging from 25s to several minutes(the task sizes are
>>>>> really close, they aren't skewed). I've tried increasing
>>>>> "spark.reducer.maxSizeInFlight" and
>>>>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>>>>> a little, but not enough to saturate the cluster resources.
>>>>>
>>>>> Did I miss some more tuning parameters that could help?
>>>>> One obvious thing would be to vertically increase the machines and use
>>>>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>>>>> considering 30x30 connections.
>>>>>
>>>>> Thanks in advance!
>>>>>
>>>>>
>>>>
>>>> --
>>>> Vladimir Prus
>>>> http://vladimirprus.com
>>>>
>>>
>>
>> --
>> --------------
>> "It is the common fate of the indolent to see their rights become a prey
>> to the active. The condition upon which God hath given liberty to man is
>> eternal vigilance; which condition if he break, servitude is at once the
>> consequence of his crime and the punishment of his guilt." - John Philpot
>> Curran: Speech upon the Right of Election, 1790.
>>
>

-- 
--------------
"It is the common fate of the indolent to see their rights become a prey to
the active. The condition upon which God hath given liberty to man is
eternal vigilance; which condition if he break, servitude is at once the
consequence of his crime and the punishment of his guilt." - John Philpot
Curran: Speech upon the Right of Election, 1790.

Re: Help with Shuffle Read performance

Posted by Sungwoo Park <gl...@gmail.com>.
Hi Leszek,

For running YARN on Kubernetes and then running Spark on YARN, is there a
lot of overhead for maintaining YARN on Kubernetes? I thought people
usually want to move from YARN to Kubernetes because of the overhead of
maintaining Hadoop.

Thanks,

--- Sungwoo


On Fri, Sep 30, 2022 at 1:37 PM Leszek Reimus <le...@gmail.com>
wrote:

> Hi Everyone,
>
> To add my 2 cents here:
>
> Advantage of containers, to me, is that it leaves the host system pristine
> and clean, allowing standardized devops deployment of hardware for any
> purpose. Way back before - when using bare metal / ansible, reusing hw
> always involved full reformat of base system. This alone is worth the ~1-2%
> performance tax cgroup containers have.
>
> Advantage of kubernetes is more on the deployment side of things. Unified
> deployment scripts that can be written by devs. Same deployment yaml (or
> helm chart) can be used on local Dev Env / QA / Integration Env and finally
> Prod (with some tweaks).
>
> Depending on the networking CNI, and storage backend - Kubernetes can have
> a very close to bare metal performance. In the end it is always a
> trade-off. You gain some, you pay with extra overhead.
>
> I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
> legacy MapReduce jobs too though) . Finding it much more manageable to
> allocate larger memory/cpu chunks to yarn pods and then have run
> auto-scaler to scale out YARN if needed; than to manage individual
> memory/cpu requirements on Spark on Kubernetes deployment.
>
> As far as I tested, Spark on Kubernetes is immature when reliability is
> concerned (or maybe our homegrown k8s does not do fencing/STONITH well
> yet). When a node dies / goes down, I find executors not getting
> rescheduled to other nodes - the driver just gets stuck for the executors
> to come back. This does not happen on YARN / Standalone deployment (even
> when ran on same k8s cluster)
>
> Sincerely,
>
> Leszek Reimus
>
>
>
>
> On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta <go...@gmail.com>
> wrote:
>
>> Hi,
>>
>> dont containers finally run on systems, and the only advantage of
>> containers is that you can do better utilisation of system resources by
>> micro management of jobs running in it? Some say that containers have their
>> own binaries which isolates environment, but that is a lie, because in a
>> kubernetes environments that is running your SPARK jobs you will have the
>> same environment for all your kubes.
>>
>> And as you can see there are several other configurations, disk mounting,
>> security, etc issues to handle as an overhead as well.
>>
>> And the entire goal of all those added configurations is that someone in
>> your devops team feels using containers makes things more interesting
>> without any real added advantage to large volume jobs.
>>
>> But I may be wrong, and perhaps we need data, and not personal attacks
>> like the other person in the thread did.
>>
>> In case anyone does not know EMR does run on containers as well, and in
>> EMR running on EC2 nodes you can put all your binaries in containers and
>> use those for running your jobs.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus <vl...@gmail.com>
>> wrote:
>>
>>> Igor,
>>>
>>> what exact instance types do you use? Unless you use local instance
>>> storage and have actually configured your Kubernetes and Spark to use
>>> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
>>> investigate that by going to an instance, then to volume, and see
>>> monitoring charts.
>>>
>>> Another thought is that you're essentially giving 4GB per core. That
>>> sounds pretty low, in my experience.
>>>
>>>
>>>
>>> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria <ig...@gmail.com>
>>> wrote:
>>>
>>>> Hi Everyone,
>>>>
>>>> I'm running spark 3.2 on kubernetes and have a job with a decently
>>>> sized shuffle of almost 4TB. The relevant cluster config is as follows:
>>>>
>>>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>>>> - 128 GB RAM
>>>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>>>
>>>> The job runs fine but I'm bothered by how underutilized the cluster
>>>> gets during the reduce phase. During the map(reading data from s3 and
>>>> writing the shuffle data) CPU usage, disk throughput and network usage is
>>>> as expected, but during the reduce phase it gets really low. It seems the
>>>> main bottleneck is reading shuffle data from other nodes, task statistics
>>>> reports values ranging from 25s to several minutes(the task sizes are
>>>> really close, they aren't skewed). I've tried increasing
>>>> "spark.reducer.maxSizeInFlight" and
>>>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>>>> a little, but not enough to saturate the cluster resources.
>>>>
>>>> Did I miss some more tuning parameters that could help?
>>>> One obvious thing would be to vertically increase the machines and use
>>>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>>>> considering 30x30 connections.
>>>>
>>>> Thanks in advance!
>>>>
>>>>
>>>
>>> --
>>> Vladimir Prus
>>> http://vladimirprus.com
>>>
>>
>
> --
> --------------
> "It is the common fate of the indolent to see their rights become a prey
> to the active. The condition upon which God hath given liberty to man is
> eternal vigilance; which condition if he break, servitude is at once the
> consequence of his crime and the punishment of his guilt." - John Philpot
> Curran: Speech upon the Right of Election, 1790.
>

Re: Help with Shuffle Read performance

Posted by Leszek Reimus <le...@gmail.com>.
Hi Everyone,

To add my 2 cents here:

Advantage of containers, to me, is that it leaves the host system pristine
and clean, allowing standardized devops deployment of hardware for any
purpose. Way back before - when using bare metal / ansible, reusing hw
always involved full reformat of base system. This alone is worth the ~1-2%
performance tax cgroup containers have.

Advantage of kubernetes is more on the deployment side of things. Unified
deployment scripts that can be written by devs. Same deployment yaml (or
helm chart) can be used on local Dev Env / QA / Integration Env and finally
Prod (with some tweaks).

Depending on the networking CNI, and storage backend - Kubernetes can have
a very close to bare metal performance. In the end it is always a
trade-off. You gain some, you pay with extra overhead.

I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
legacy MapReduce jobs too though) . Finding it much more manageable to
allocate larger memory/cpu chunks to yarn pods and then have run
auto-scaler to scale out YARN if needed; than to manage individual
memory/cpu requirements on Spark on Kubernetes deployment.

As far as I tested, Spark on Kubernetes is immature when reliability is
concerned (or maybe our homegrown k8s does not do fencing/STONITH well
yet). When a node dies / goes down, I find executors not getting
rescheduled to other nodes - the driver just gets stuck for the executors
to come back. This does not happen on YARN / Standalone deployment (even
when ran on same k8s cluster)

Sincerely,

Leszek Reimus




On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> dont containers finally run on systems, and the only advantage of
> containers is that you can do better utilisation of system resources by
> micro management of jobs running in it? Some say that containers have their
> own binaries which isolates environment, but that is a lie, because in a
> kubernetes environments that is running your SPARK jobs you will have the
> same environment for all your kubes.
>
> And as you can see there are several other configurations, disk mounting,
> security, etc issues to handle as an overhead as well.
>
> And the entire goal of all those added configurations is that someone in
> your devops team feels using containers makes things more interesting
> without any real added advantage to large volume jobs.
>
> But I may be wrong, and perhaps we need data, and not personal attacks
> like the other person in the thread did.
>
> In case anyone does not know EMR does run on containers as well, and in
> EMR running on EC2 nodes you can put all your binaries in containers and
> use those for running your jobs.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus <vl...@gmail.com>
> wrote:
>
>> Igor,
>>
>> what exact instance types do you use? Unless you use local instance
>> storage and have actually configured your Kubernetes and Spark to use
>> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
>> investigate that by going to an instance, then to volume, and see
>> monitoring charts.
>>
>> Another thought is that you're essentially giving 4GB per core. That
>> sounds pretty low, in my experience.
>>
>>
>>
>> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria <ig...@gmail.com>
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>>
>>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>>> - 128 GB RAM
>>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>>
>>> The job runs fine but I'm bothered by how underutilized the cluster gets
>>> during the reduce phase. During the map(reading data from s3 and writing
>>> the shuffle data) CPU usage, disk throughput and network usage is as
>>> expected, but during the reduce phase it gets really low. It seems the main
>>> bottleneck is reading shuffle data from other nodes, task statistics
>>> reports values ranging from 25s to several minutes(the task sizes are
>>> really close, they aren't skewed). I've tried increasing
>>> "spark.reducer.maxSizeInFlight" and
>>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>>> a little, but not enough to saturate the cluster resources.
>>>
>>> Did I miss some more tuning parameters that could help?
>>> One obvious thing would be to vertically increase the machines and use
>>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>>> considering 30x30 connections.
>>>
>>> Thanks in advance!
>>>
>>>
>>
>> --
>> Vladimir Prus
>> http://vladimirprus.com
>>
>

-- 
--------------
"It is the common fate of the indolent to see their rights become a prey to
the active. The condition upon which God hath given liberty to man is
eternal vigilance; which condition if he break, servitude is at once the
consequence of his crime and the punishment of his guilt." - John Philpot
Curran: Speech upon the Right of Election, 1790.

Re: Help with Shuffle Read performance

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

dont containers finally run on systems, and the only advantage of
containers is that you can do better utilisation of system resources by
micro management of jobs running in it? Some say that containers have their
own binaries which isolates environment, but that is a lie, because in a
kubernetes environments that is running your SPARK jobs you will have the
same environment for all your kubes.

And as you can see there are several other configurations, disk mounting,
security, etc issues to handle as an overhead as well.

And the entire goal of all those added configurations is that someone in
your devops team feels using containers makes things more interesting
without any real added advantage to large volume jobs.

But I may be wrong, and perhaps we need data, and not personal attacks like
the other person in the thread did.

In case anyone does not know EMR does run on containers as well, and in EMR
running on EC2 nodes you can put all your binaries in containers and use
those for running your jobs.

Regards,
Gourav Sengupta

On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus <vl...@gmail.com>
wrote:

> Igor,
>
> what exact instance types do you use? Unless you use local instance
> storage and have actually configured your Kubernetes and Spark to use
> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
> investigate that by going to an instance, then to volume, and see
> monitoring charts.
>
> Another thought is that you're essentially giving 4GB per core. That
> sounds pretty low, in my experience.
>
>
>
> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria <ig...@gmail.com>
> wrote:
>
>> Hi Everyone,
>>
>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>
>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>> - 128 GB RAM
>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>
>> The job runs fine but I'm bothered by how underutilized the cluster gets
>> during the reduce phase. During the map(reading data from s3 and writing
>> the shuffle data) CPU usage, disk throughput and network usage is as
>> expected, but during the reduce phase it gets really low. It seems the main
>> bottleneck is reading shuffle data from other nodes, task statistics
>> reports values ranging from 25s to several minutes(the task sizes are
>> really close, they aren't skewed). I've tried increasing
>> "spark.reducer.maxSizeInFlight" and
>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>> a little, but not enough to saturate the cluster resources.
>>
>> Did I miss some more tuning parameters that could help?
>> One obvious thing would be to vertically increase the machines and use
>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>> considering 30x30 connections.
>>
>> Thanks in advance!
>>
>>
>
> --
> Vladimir Prus
> http://vladimirprus.com
>

Re: Help with Shuffle Read performance

Posted by Igor Calabria <ig...@gmail.com>.
> What's the total number of Partitions that you have ?
18k

> What machines are you using ? Are you using an SSD ?
Using a family of r5.4xlarges nodes. Yes I'm using five GP3 Disks which
gives me about 625 MB/s of sustained throughput (which is what I see when
writing the shuffle data).

> can you please provide whats the size of the shuffle file that is getting
generated in each task .
I have to check that. But total sizes of tasks are around 150 ~ 180 MB

> what exact instance types do you use? Unless you use local instance
storage and have actually configured your Kubernetes and Spark to use
instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
investigate that by going to an instance, then to volume, and see
monitoring charts.
That's a pretty good hint. I forgot to check the IOPs limit, was only
looking at the throughput.

> Another thought is that you're essentially giving 4GB per core. That
sounds pretty low, in my experience.
For this particular job, it seems fine. No long GCs and peak usage per task
was reported at 1.4, so plenty of room.

Thanks a lot for the responses. I'm betting this is related to EBS IOPS
limits, since most of our jobs use instances with local disks instead.

On Thu, Sep 29, 2022 at 7:44 PM Vladimir Prus <vl...@gmail.com>
wrote:

> Igor,
>
> what exact instance types do you use? Unless you use local instance
> storage and have actually configured your Kubernetes and Spark to use
> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
> investigate that by going to an instance, then to volume, and see
> monitoring charts.
>
> Another thought is that you're essentially giving 4GB per core. That
> sounds pretty low, in my experience.
>
>
>
> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria <ig...@gmail.com>
> wrote:
>
>> Hi Everyone,
>>
>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>
>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>> - 128 GB RAM
>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>
>> The job runs fine but I'm bothered by how underutilized the cluster gets
>> during the reduce phase. During the map(reading data from s3 and writing
>> the shuffle data) CPU usage, disk throughput and network usage is as
>> expected, but during the reduce phase it gets really low. It seems the main
>> bottleneck is reading shuffle data from other nodes, task statistics
>> reports values ranging from 25s to several minutes(the task sizes are
>> really close, they aren't skewed). I've tried increasing
>> "spark.reducer.maxSizeInFlight" and
>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>> a little, but not enough to saturate the cluster resources.
>>
>> Did I miss some more tuning parameters that could help?
>> One obvious thing would be to vertically increase the machines and use
>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>> considering 30x30 connections.
>>
>> Thanks in advance!
>>
>>
>
> --
> Vladimir Prus
> http://vladimirprus.com
>

Re: Help with Shuffle Read performance

Posted by Vladimir Prus <vl...@gmail.com>.
Igor,

what exact instance types do you use? Unless you use local instance storage
and have actually configured your Kubernetes and Spark to use instance
storage, your 30x30 exchange can run into EBS IOPS limits. You can
investigate that by going to an instance, then to volume, and see
monitoring charts.

Another thought is that you're essentially giving 4GB per core. That sounds
pretty low, in my experience.



On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria <ig...@gmail.com>
wrote:

> Hi Everyone,
>
> I'm running spark 3.2 on kubernetes and have a job with a decently sized
> shuffle of almost 4TB. The relevant cluster config is as follows:
>
> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
> - 128 GB RAM
> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>
> The job runs fine but I'm bothered by how underutilized the cluster gets
> during the reduce phase. During the map(reading data from s3 and writing
> the shuffle data) CPU usage, disk throughput and network usage is as
> expected, but during the reduce phase it gets really low. It seems the main
> bottleneck is reading shuffle data from other nodes, task statistics
> reports values ranging from 25s to several minutes(the task sizes are
> really close, they aren't skewed). I've tried increasing
> "spark.reducer.maxSizeInFlight" and
> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
> a little, but not enough to saturate the cluster resources.
>
> Did I miss some more tuning parameters that could help?
> One obvious thing would be to vertically increase the machines and use
> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
> considering 30x30 connections.
>
> Thanks in advance!
>
>

-- 
Vladimir Prus
http://vladimirprus.com

Re: Help with Shuffle Read performance

Posted by Tufan Rakshit <tu...@gmail.com>.
that's Total Nonsense , EMR is total  crap , use kubernetes i will help
you .
can you please provide whats the size of the shuffle file that is getting
generated in each task .
What's the total number of Partitions that you have ?
What machines are you using ? Are you using an SSD ?

Best
Tufan

On Thu, 29 Sept 2022 at 20:28, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> why not use EMR or data proc, kubernetes does not provide any benefit at
> all for such scale of work. It is a classical case of over engineering and
> over complication just for the heck of it.
>
> Also I think that in case you are in AWS, Redshift Spectrum or Athena for
> 90% of use cases are way optimal.
>
> Regards,
> Gourav
>
> On Thu, Sep 29, 2022 at 7:13 PM Igor Calabria <ig...@gmail.com>
> wrote:
>
>> Hi Everyone,
>>
>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>
>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>> - 128 GB RAM
>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>
>> The job runs fine but I'm bothered by how underutilized the cluster gets
>> during the reduce phase. During the map(reading data from s3 and writing
>> the shuffle data) CPU usage, disk throughput and network usage is as
>> expected, but during the reduce phase it gets really low. It seems the main
>> bottleneck is reading shuffle data from other nodes, task statistics
>> reports values ranging from 25s to several minutes(the task sizes are
>> really close, they aren't skewed). I've tried increasing
>> "spark.reducer.maxSizeInFlight" and
>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>> a little, but not enough to saturate the cluster resources.
>>
>> Did I miss some more tuning parameters that could help?
>> One obvious thing would be to vertically increase the machines and use
>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>> considering 30x30 connections.
>>
>> Thanks in advance!
>>
>>

Re: Help with Shuffle Read performance

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

why not use EMR or data proc, kubernetes does not provide any benefit at
all for such scale of work. It is a classical case of over engineering and
over complication just for the heck of it.

Also I think that in case you are in AWS, Redshift Spectrum or Athena for
90% of use cases are way optimal.

Regards,
Gourav

On Thu, Sep 29, 2022 at 7:13 PM Igor Calabria <ig...@gmail.com>
wrote:

> Hi Everyone,
>
> I'm running spark 3.2 on kubernetes and have a job with a decently sized
> shuffle of almost 4TB. The relevant cluster config is as follows:
>
> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
> - 128 GB RAM
> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>
> The job runs fine but I'm bothered by how underutilized the cluster gets
> during the reduce phase. During the map(reading data from s3 and writing
> the shuffle data) CPU usage, disk throughput and network usage is as
> expected, but during the reduce phase it gets really low. It seems the main
> bottleneck is reading shuffle data from other nodes, task statistics
> reports values ranging from 25s to several minutes(the task sizes are
> really close, they aren't skewed). I've tried increasing
> "spark.reducer.maxSizeInFlight" and
> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
> a little, but not enough to saturate the cluster resources.
>
> Did I miss some more tuning parameters that could help?
> One obvious thing would be to vertically increase the machines and use
> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
> considering 30x30 connections.
>
> Thanks in advance!
>
>

Re: Help with Shuffle Read performance

Posted by Igor Calabria <ig...@gmail.com>.
Thanks a lot for the answers foks.

It turned out that spark was just IOPs starved. Using better disks solved
my issue, so nothing related to kubernetes at all.

Have a nice weekend everyone

On Fri, Sep 30, 2022 at 4:27 PM Artemis User <ar...@dtechspace.com> wrote:

> The reduce phase is always more resource-intensive than the map phase.
> Couple of suggestions you may want to consider:
>
>    1. Setting the number of partitions to 18K may be way too high (the
>    default number is only 200).  You may want to just use the default and the
>    scheduler will automatically increase the partitions if needed.
>    2. Turn on dynamic resource allocation (DRA) (
>    https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation).
>    It would allow those executors that finish the map tasks returning the
>    resources (e.g. RAM, CPU cores) back to the cluster, and reallocate the
>    resources to the reduce tasks.  This feature (post Spark 3.0) is also
>    available to K8, but turned off by default.
>    3. With DRA turned on, you may want also try to play with a small
>    number of number of executors/nodes thus reducing shuffling needs, given
>    the fact that you only have 128GB RAM.
>
> Hope this helps...
>
> On 9/29/22 2:12 PM, Igor Calabria wrote:
>
> Hi Everyone,
>
> I'm running spark 3.2 on kubernetes and have a job with a decently sized
> shuffle of almost 4TB. The relevant cluster config is as follows:
>
> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
> - 128 GB RAM
> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>
> The job runs fine but I'm bothered by how underutilized the cluster gets
> during the reduce phase. During the map(reading data from s3 and writing
> the shuffle data) CPU usage, disk throughput and network usage is as
> expected, but during the reduce phase it gets really low. It seems the main
> bottleneck is reading shuffle data from other nodes, task statistics
> reports values ranging from 25s to several minutes(the task sizes are
> really close, they aren't skewed). I've tried increasing
> "spark.reducer.maxSizeInFlight" and
> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
> a little, but not enough to saturate the cluster resources.
>
> Did I miss some more tuning parameters that could help?
> One obvious thing would be to vertically increase the machines and use
> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
> considering 30x30 connections.
>
> Thanks in advance!
>
>
>

Re: Help with Shuffle Read performance

Posted by Artemis User <ar...@dtechspace.com>.
The reduce phase is always more resource-intensive than the map phase.  
Couple of suggestions you may want to consider:

 1. Setting the number of partitions to 18K may be way too high (the
    default number is only 200).  You may want to just use the default
    and the scheduler will automatically increase the partitions if needed.
 2. Turn on dynamic resource allocation (DRA)
    (https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation).
    It would allow those executors that finish the map tasks returning
    the resources (e.g. RAM, CPU cores) back to the cluster, and
    reallocate the resources to the reduce tasks.  This feature (post
    Spark 3.0) is also available to K8, but turned off by default.
 3. With DRA turned on, you may want also try to play with a small
    number of number of executors/nodes thus reducing shuffling needs,
    given the fact that you only have 128GB RAM.

Hope this helps...


On 9/29/22 2:12 PM, Igor Calabria wrote:
> Hi Everyone,
>
> I'm running spark 3.2 on kubernetes and have a job with a decently 
> sized shuffle of almost 4TB. The relevant cluster config is as follows:
>
> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
> - 128 GB RAM
> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>
> The job runs fine but I'm bothered by how underutilized the cluster 
> gets during the reduce phase. During the map(reading data from s3 and 
> writing the shuffle data) CPU usage, disk throughput and network usage 
> is as expected, but during the reduce phase it gets really low. It 
> seems the main bottleneck is reading shuffle data from other nodes, 
> task statistics reports values ranging from 25s to several minutes(the 
> task sizes are really close, they aren't skewed). I've tried 
> increasing "spark.reducer.maxSizeInFlight" and 
> "spark.shuffle.io.numConnectionsPerPeer" and it did improve 
> performance by a little, but not enough to saturate the cluster resources.
>
> Did I miss some more tuning parameters that could help?
> One obvious thing would be to vertically increase the machines and use 
> less nodes to minimize traffic, but 30 nodes doesn't seem like much 
> even considering 30x30 connections.
>
> Thanks in advance!
>