You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "15927907987@163.com" <15...@163.com> on 2022/02/10 09:30:09 UTC

Execution efficiency slows down as the number of CPU cores increases

Hello,
    I recently used spark3.2 to do a test based on the TPC-DS dataset, and the entire TPC-DS data scale is 1TB(located in HDFS). But I encountered a problem that I couldn't understand, and I hope to get your help.
    The SQL statement tested is as follows:
     select count(*) from (
        select distinct c_last_name, c_first_name, d_date
         from store_sales, data_dim, customer
                    where store_sales.ss_sold_date_sk = date_dim.d_date_sk
             and store_sales.ss_customer_sk = customer.c_customer_sk
             and d_month_seq between 1200 and 1200 + 11
    )hot_cust
     limit 100;

Three tables are used here, and their sizes are: store_sales(387.97GB), date_dim(9.84GB),customer(1.52GB)

When submitting the application, the driver and executor parameters are set as follows:
DRIVER_OPTIONS="--master spark://xx.xx.xx.xx:7078 --total-executor-cores 48 --conf spark.driver.memory=100g --conf spark.default.parallelism=100 --conf spark.sql.adaptive.enabled=false" 
 EXECUTOR_OPTIONS="--executor-cores 12 --conf spark.executor.memory=50g --conf spark.task.cpus=1 --conf spark.sql.shuffle.partitions=100 --conf spark.sql.crossJoin.enabled=true"

The Spark cluster is built on a physical node with only one worker, and the submitted tasks are in StandAlone mode. A total of 56 cores, 376GB of memory. I kept executor-num=4 unchanged (that is, the total executors remained unchanged), and adjusted executor-cores=2, 4, 8, 12. The corresponding application time was: 96s, 66s, 50s, 49s. It was found that the total- executor-cores >=32, the time-consuming is almost unchanged. 
I monitored CPU utilization, IO, and found no bottlenecks. Then I analyzed the task time in the longest stage of each application in the spark web UI interface(The longest time-consuming stage in the four test cases is stage4, and the corresponding time-consuming: 48s, 28s, 22s, 23s). We see that with the increase of cpu cores, the task time in stage4 will also increase. The average task time is: 3s, 4s, 5s, 7s. And each executor takes longer to run the first batch of tasks than subsequent tasks. The more cores there are, the greater the time gap:

Test-Case-ID total-executor-cores executor-cores total application time  stage4 time average task time  The first batch of tasks takes time  The remaining batch tasks take time  
1 8 2 96s 48s 7s 5-6s 3-4s 
2 16 4 66s 28s 5s 7-8s ~4s 
3 32 8 50s 23s 4s 10-12s ~5s 
4 48 12 49s 22s 3s 14-15s ~6s 

This result confuses me, why after increasing the number of cores, the execution time of tasks becomes longer. I compared the data size processed by each task in different cases, they are all consistent. Or why is the execution efficiency almost unchanged after the number of cores increases to a certain number.
Looking forward to your reply.



15927907987@163.com

Re: Re: Execution efficiency slows down as the number of CPU cores increases

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,

in answer to your question below

You have this setup:

 Our spark cluster is deployed on a physical machine with a total of 365GB
of memory, 56 CPU cores, and one worker. There is no Yarn service, and the
standalone mode is used. HDFS is also deployed on the same machine, which
has 4 mechanical hard disks (not ssd), and the data(1TB) is distributed
among these 4 hard disks

ok in file ${SPARK_HOME}/conf you can assign more workers on your host.
Example below I have

 cat workers

# A Spark Worker will be started on each of the machines listed below.
rhes75
rhes75
rhes75
rhes75
rhes76
rhes76
rhes76
rhes76
rhes564

so I have 4 workers on host rhes75, 4 workers on host rhes76 and one worker
on rhes564. When you run spark-submit on standalone mode, you will have 9
executors here. So you can increase the number of workers in your host as
many as you can configure.

    ${SPARK_HOME}/bin/spark-submit \
                --driver-memory 24G \
                --executor-memory 8G \
                --num-executors 4 \
                --executor-cores 4 \
                --master spark://<HOST>:7077 \
..

etc

Next to your question

".. there is one question that confuses me too. In the 4 tests, why does
the execution time of the first batch of tasks in the stage of each test
application take longer than the execution time of subsequent tasks? The
execution times of subsequent tasks tend to be consistent? Would you like
to help me  to find the reason for this? I will be very grateful.  "


My view is that a factor that influences this behaviour when you are
fetching data for the first time, is that you are starting from a cold
cache in your storage file system's when reading the data. The first stage
is performing physical IOs (PIO) (caching data from the disks) for your SQL
statement. This PIO is about 18-20 times slower than logical IO (LIO)s. The
next stage you already have data mostly cached in the file cache so mostly
it is doing LIO (reading from the file cache), so it is consistent with
reading mostly from memory blocks at later stages and hence uniform
responses.


HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 13 Feb 2022 at 14:10, Sean Owen <sr...@gmail.com> wrote:

> Is this a problem? you're getting faster times with more cores.
> See last email - CPU throttling for example?
> Indeed more tasks with a shuffle might actually slow down one part of it.
> More work is happening in parallel in the longer wall clock time.
>
> On Sun, Feb 13, 2022 at 7:48 AM 15927907987@163.com <15...@163.com>
> wrote:
>
>> @Mich Talebzadeh @Sean Owen
>> Thank you for your suggestion and help. I also tried to scan the
>> time-consuming effects of various parameters in the past, such as
>> executor-memory, driver-memory, parallelism, etc. These parameters will
>> only affect when the value is small, and increase to a certain value, it
>> has no effect on the execution time of each task. It is only found that
>> the increase in the number of cpu cores has a great impact on the task
>> execution time. Also, would you like to specify what spark cluster
>> information I need to provide?
>> Our spark cluster is deployed on a physical machine with a total of 365GB
>> of memory, 56 CPU cores, and one worker. There is no Yarn service, and
>> the standalone mode is used. HDFS is also deployed on the same machine,
>> which has 4 mechanical hard disks (not ssd), and the data(1TB) is
>> distributed among these 4 hard disks. When the application is running,
>> we monitor the IO, and there is no IO bottleneck.
>> Also, as I mentioned last time, there is one question that confuses me
>> too. In the 4 tests, why does the execution time of the first batch of
>> tasks in the stage of each test application take longer than the execution
>> time of subsequent tasks? The execution times of subsequent tasks tend
>> to be consistent? Would you like to help me  to find the reason for
>> this? I will be very grateful.
>> Sorry that the average task time column in the table was reversed last
>> time, the correct data record is shown below.
>> As far as I know, the increase in the number of CPU cores will sometimes
>> lead to a decrease in the turbo frequency during CPU execution. Is this one
>> of the reasons for the increase in task execution time?
>> Looking forward to your reply.
>>
>> ------------------------------
>> 15927907987@163.com
>>
>>
>> *From:* Mich Talebzadeh <mi...@gmail.com>
>> *Date:* 2022-02-11 00:48
>> *To:*
>> *CC:* 15927907987@163.com; user <us...@spark.apache.org>
>> *Subject:* Re: Execution efficiency slows down as the number of CPU
>> cores increases
>>
>> Hi,
>>
>>
>> Can Please provide some more info about your Spark cluster setup. You
>> mentioned Hadoop as the underlying storage. I assume that there is data
>> locality between your Spark cluster and the
>>
>> the underlying hadoop.
>>
>>
>> In your SQL statement below
>>
>> select count(*) from (
>>         select *distinct* c_last_name, c_first_name, d_date
>> You will have to shuffle because of distinct as each worker will have to
>> read data separately and perform the reduce task to get the local distinct
>> value and one final shuffle to get the actual distinct
>> for all the data.
>>
>> Increasing the number of cores implies increasing parallel action on the
>> underlying data for the same executor. So there would be more distinct jobs
>> here. My view would be that you plot the execution time
>> of tasks (y-axis) with x-axis being parameters of interest so it would be
>> easier to grab it. You can use Excel for it.
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 10 Feb 2022 at 13:16, Sean Owen <sr...@gmail.com> wrote:
>>
>>> More cores is finishing faster as expected. My guess is that you are
>>> getting more parallelism overall and that speeds things up. However with
>>> more tasks executing concurrently on one machine, you are getting some
>>> contention, so it's possible more tasks are taking longer - a little I/O
>>> contention, CPU throttling under load, etc. Could be other reasons but this
>>> is possibly quite normal.
>>>
>>> On Thu, Feb 10, 2022 at 7:02 AM 15927907987@163.com <15...@163.com>
>>> wrote:
>>>
>>>> Hello,
>>>>     I recently used spark3.2 to do a test based on the TPC-DS dataset,
>>>> and the entire TPC-DS data scale is 1TB(located in HDFS). But I encountered
>>>> a problem that I couldn't understand, and I hope to get your help.
>>>>     The SQL statement tested is as follows:
>>>>      select count(*) from (
>>>>         select distinct c_last_name, c_first_name, d_date
>>>>          from store_sales, data_dim, customer
>>>>                     where store_sales.ss_sold_date_sk =
>>>> date_dim.d_date_sk
>>>>              and store_sales.ss_customer_sk = customer.c_customer_sk
>>>>              and d_month_seq between 1200 and 1200 + 11
>>>>     )hot_cust
>>>>      limit 100;
>>>>
>>>> Three tables are used here, and their sizes are: store_sales(387.97GB),
>>>> date_dim(9.84GB),customer(1.52GB)
>>>>
>>>> When submitting the application, the driver and executor parameters are
>>>> set as follows:
>>>> DRIVER_OPTIONS="--master spark://xx.xx.xx.xx:7078
>>>> --total-executor-cores 48 --conf spark.driver.memory=100g --conf
>>>> spark.default.parallelism=100 --conf spark.sql.adaptive.enabled=false"
>>>>  EXECUTOR_OPTIONS="--executor-cores 12 --conf spark.executor.memory=50g
>>>> --conf spark.task.cpus=1 --conf spark.sql.shuffle.partitions=100 --conf
>>>> spark.sql.crossJoin.enabled=true"
>>>>
>>>> The Spark cluster is built on a physical node with only one worker, and
>>>> the submitted tasks are in StandAlone mode. A total of 56 cores, 376GB
>>>> of memory. I kept executor-num=4 unchanged (that is, the total
>>>> executors remained unchanged), and adjusted executor-cores=2, 4, 8, 12. The
>>>> corresponding application time was: 96s, 66s, 50s, 49s. It was found that
>>>> the total- executor-cores >=32, the time-consuming is almost
>>>> unchanged.
>>>> I monitored CPU utilization, IO, and found no bottlenecks. Then I
>>>> analyzed the task time in the longest stage of each application in the
>>>> spark web UI interface(The longest time-consuming stage in the four
>>>> test cases is stage4, and the corresponding time-consuming: 48s, 28s, 22s,
>>>> 23s). We see that with the increase of cpu cores, the task time in
>>>> stage4 will also increase. The average task time is: 3s, 4s, 5s, 7s. And
>>>> each executor takes longer to run the first batch of tasks than subsequent
>>>> tasks. The more cores there are, the greater the time gap:
>>>>
>>>> Test-Case-ID total-executor-cores executor-cores total application
>>>> time  stage4 time average task time  The first batch of tasks takes
>>>> time  The remaining batch tasks take time
>>>> 1 8 2 96s 48s 7s 5-6s 3-4s
>>>> 2 16 4 66s 28s 5s 7-8s ~4s
>>>> 3 32 8 50s 23s 4s 10-12s ~5s
>>>> 4 48 12 49s 22s 3s 14-15s ~6s
>>>>
>>>> This result confuses me, why after increasing the number of cores, the
>>>> execution time of tasks becomes longer. I compared the data size processed
>>>> by each task in different cases, they are all consistent. Or why is
>>>> the execution efficiency almost unchanged after the number of cores
>>>> increases to a certain number.
>>>> Looking forward to your reply.
>>>>
>>>> ------------------------------
>>>> 15927907987@163.com
>>>>
>>>

Re: Re: Execution efficiency slows down as the number of CPU cores increases

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,

in answer to your question below

You have this setup:

 Our spark cluster is deployed on a physical machine with a total of 365GB
of memory, 56 CPU cores, and one worker. There is no Yarn service, and the
standalone mode is used. HDFS is also deployed on the same machine, which
has 4 mechanical hard disks (not ssd), and the data(1TB) is distributed
among these 4 hard disks

ok in file ${SPARK_HOME}/conf you can assign more workers on your host.
Example below I have

 cat workers

# A Spark Worker will be started on each of the machines listed below.
rhes75
rhes75
rhes75
rhes75
rhes76
rhes76
rhes76
rhes76
rhes564

so I have 4 workers on host rhes75, 4 workers on host rhes76 and one worker
on rhes564. When you run spark-submit on standalone mode, you will have 9
executors here. So you can increase the number of workers in your host as
many as you can configure.

    ${SPARK_HOME}/bin/spark-submit \
                --driver-memory 24G \
                --executor-memory 8G \
                --num-executors 4 \
                --executor-cores 4 \
                --master spark://<HOST>:7077 \
..

etc

Next to your question

".. there is one question that confuses me too. In the 4 tests, why does
the execution time of the first batch of tasks in the stage of each test
application take longer than the execution time of subsequent tasks? The
execution times of subsequent tasks tend to be consistent? Would you like
to help me  to find the reason for this? I will be very grateful.  "


My view is that a factor that influences this behaviour when you are
fetching data for the first time, is that you are starting from a cold
cache in your storage file system's when reading the data. The first stage
is performing physical IOs (PIO) (caching data from the disks) for your SQL
statement. This PIO is about 18-20 times slower than logical IO (LIO)s. The
next stage you already have data mostly cached in the file cache so mostly
it is doing LIO (reading from the file cache), so it is consistent with
reading mostly from memory blocks at later stages and hence uniform
responses.


HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 13 Feb 2022 at 14:10, Sean Owen <sr...@gmail.com> wrote:

> Is this a problem? you're getting faster times with more cores.
> See last email - CPU throttling for example?
> Indeed more tasks with a shuffle might actually slow down one part of it.
> More work is happening in parallel in the longer wall clock time.
>
> On Sun, Feb 13, 2022 at 7:48 AM 15927907987@163.com <15...@163.com>
> wrote:
>
>> @Mich Talebzadeh @Sean Owen
>> Thank you for your suggestion and help. I also tried to scan the
>> time-consuming effects of various parameters in the past, such as
>> executor-memory, driver-memory, parallelism, etc. These parameters will
>> only affect when the value is small, and increase to a certain value, it
>> has no effect on the execution time of each task. It is only found that
>> the increase in the number of cpu cores has a great impact on the task
>> execution time. Also, would you like to specify what spark cluster
>> information I need to provide?
>> Our spark cluster is deployed on a physical machine with a total of 365GB
>> of memory, 56 CPU cores, and one worker. There is no Yarn service, and
>> the standalone mode is used. HDFS is also deployed on the same machine,
>> which has 4 mechanical hard disks (not ssd), and the data(1TB) is
>> distributed among these 4 hard disks. When the application is running,
>> we monitor the IO, and there is no IO bottleneck.
>> Also, as I mentioned last time, there is one question that confuses me
>> too. In the 4 tests, why does the execution time of the first batch of
>> tasks in the stage of each test application take longer than the execution
>> time of subsequent tasks? The execution times of subsequent tasks tend
>> to be consistent? Would you like to help me  to find the reason for
>> this? I will be very grateful.
>> Sorry that the average task time column in the table was reversed last
>> time, the correct data record is shown below.
>> As far as I know, the increase in the number of CPU cores will sometimes
>> lead to a decrease in the turbo frequency during CPU execution. Is this one
>> of the reasons for the increase in task execution time?
>> Looking forward to your reply.
>>
>> ------------------------------
>> 15927907987@163.com
>>
>>
>> *From:* Mich Talebzadeh <mi...@gmail.com>
>> *Date:* 2022-02-11 00:48
>> *To:*
>> *CC:* 15927907987@163.com; user <us...@spark.apache.org>
>> *Subject:* Re: Execution efficiency slows down as the number of CPU
>> cores increases
>>
>> Hi,
>>
>>
>> Can Please provide some more info about your Spark cluster setup. You
>> mentioned Hadoop as the underlying storage. I assume that there is data
>> locality between your Spark cluster and the
>>
>> the underlying hadoop.
>>
>>
>> In your SQL statement below
>>
>> select count(*) from (
>>         select *distinct* c_last_name, c_first_name, d_date
>> You will have to shuffle because of distinct as each worker will have to
>> read data separately and perform the reduce task to get the local distinct
>> value and one final shuffle to get the actual distinct
>> for all the data.
>>
>> Increasing the number of cores implies increasing parallel action on the
>> underlying data for the same executor. So there would be more distinct jobs
>> here. My view would be that you plot the execution time
>> of tasks (y-axis) with x-axis being parameters of interest so it would be
>> easier to grab it. You can use Excel for it.
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 10 Feb 2022 at 13:16, Sean Owen <sr...@gmail.com> wrote:
>>
>>> More cores is finishing faster as expected. My guess is that you are
>>> getting more parallelism overall and that speeds things up. However with
>>> more tasks executing concurrently on one machine, you are getting some
>>> contention, so it's possible more tasks are taking longer - a little I/O
>>> contention, CPU throttling under load, etc. Could be other reasons but this
>>> is possibly quite normal.
>>>
>>> On Thu, Feb 10, 2022 at 7:02 AM 15927907987@163.com <15...@163.com>
>>> wrote:
>>>
>>>> Hello,
>>>>     I recently used spark3.2 to do a test based on the TPC-DS dataset,
>>>> and the entire TPC-DS data scale is 1TB(located in HDFS). But I encountered
>>>> a problem that I couldn't understand, and I hope to get your help.
>>>>     The SQL statement tested is as follows:
>>>>      select count(*) from (
>>>>         select distinct c_last_name, c_first_name, d_date
>>>>          from store_sales, data_dim, customer
>>>>                     where store_sales.ss_sold_date_sk =
>>>> date_dim.d_date_sk
>>>>              and store_sales.ss_customer_sk = customer.c_customer_sk
>>>>              and d_month_seq between 1200 and 1200 + 11
>>>>     )hot_cust
>>>>      limit 100;
>>>>
>>>> Three tables are used here, and their sizes are: store_sales(387.97GB),
>>>> date_dim(9.84GB),customer(1.52GB)
>>>>
>>>> When submitting the application, the driver and executor parameters are
>>>> set as follows:
>>>> DRIVER_OPTIONS="--master spark://xx.xx.xx.xx:7078
>>>> --total-executor-cores 48 --conf spark.driver.memory=100g --conf
>>>> spark.default.parallelism=100 --conf spark.sql.adaptive.enabled=false"
>>>>  EXECUTOR_OPTIONS="--executor-cores 12 --conf spark.executor.memory=50g
>>>> --conf spark.task.cpus=1 --conf spark.sql.shuffle.partitions=100 --conf
>>>> spark.sql.crossJoin.enabled=true"
>>>>
>>>> The Spark cluster is built on a physical node with only one worker, and
>>>> the submitted tasks are in StandAlone mode. A total of 56 cores, 376GB
>>>> of memory. I kept executor-num=4 unchanged (that is, the total
>>>> executors remained unchanged), and adjusted executor-cores=2, 4, 8, 12. The
>>>> corresponding application time was: 96s, 66s, 50s, 49s. It was found that
>>>> the total- executor-cores >=32, the time-consuming is almost
>>>> unchanged.
>>>> I monitored CPU utilization, IO, and found no bottlenecks. Then I
>>>> analyzed the task time in the longest stage of each application in the
>>>> spark web UI interface(The longest time-consuming stage in the four
>>>> test cases is stage4, and the corresponding time-consuming: 48s, 28s, 22s,
>>>> 23s). We see that with the increase of cpu cores, the task time in
>>>> stage4 will also increase. The average task time is: 3s, 4s, 5s, 7s. And
>>>> each executor takes longer to run the first batch of tasks than subsequent
>>>> tasks. The more cores there are, the greater the time gap:
>>>>
>>>> Test-Case-ID total-executor-cores executor-cores total application
>>>> time  stage4 time average task time  The first batch of tasks takes
>>>> time  The remaining batch tasks take time
>>>> 1 8 2 96s 48s 7s 5-6s 3-4s
>>>> 2 16 4 66s 28s 5s 7-8s ~4s
>>>> 3 32 8 50s 23s 4s 10-12s ~5s
>>>> 4 48 12 49s 22s 3s 14-15s ~6s
>>>>
>>>> This result confuses me, why after increasing the number of cores, the
>>>> execution time of tasks becomes longer. I compared the data size processed
>>>> by each task in different cases, they are all consistent. Or why is
>>>> the execution efficiency almost unchanged after the number of cores
>>>> increases to a certain number.
>>>> Looking forward to your reply.
>>>>
>>>> ------------------------------
>>>> 15927907987@163.com
>>>>
>>>

Re: Re: Execution efficiency slows down as the number of CPU cores increases

Posted by Sean Owen <sr...@gmail.com>.
Is this a problem? you're getting faster times with more cores.
See last email - CPU throttling for example?
Indeed more tasks with a shuffle might actually slow down one part of it.
More work is happening in parallel in the longer wall clock time.

On Sun, Feb 13, 2022 at 7:48 AM 15927907987@163.com <15...@163.com>
wrote:

> @Mich Talebzadeh @Sean Owen
> Thank you for your suggestion and help. I also tried to scan the
> time-consuming effects of various parameters in the past, such as
> executor-memory, driver-memory, parallelism, etc. These parameters will
> only affect when the value is small, and increase to a certain value, it
> has no effect on the execution time of each task. It is only found that
> the increase in the number of cpu cores has a great impact on the task
> execution time. Also, would you like to specify what spark cluster
> information I need to provide?
> Our spark cluster is deployed on a physical machine with a total of 365GB
> of memory, 56 CPU cores, and one worker. There is no Yarn service, and
> the standalone mode is used. HDFS is also deployed on the same machine,
> which has 4 mechanical hard disks (not ssd), and the data(1TB) is
> distributed among these 4 hard disks. When the application is running, we
> monitor the IO, and there is no IO bottleneck.
> Also, as I mentioned last time, there is one question that confuses me too. In
> the 4 tests, why does the execution time of the first batch of tasks in the
> stage of each test application take longer than the execution time of
> subsequent tasks? The execution times of subsequent tasks tend to be
> consistent? Would you like to help me  to find the reason for this? I
> will be very grateful.
> Sorry that the average task time column in the table was reversed last
> time, the correct data record is shown below.
> As far as I know, the increase in the number of CPU cores will sometimes
> lead to a decrease in the turbo frequency during CPU execution. Is this one
> of the reasons for the increase in task execution time?
> Looking forward to your reply.
>
> ------------------------------
> 15927907987@163.com
>
>
> *From:* Mich Talebzadeh <mi...@gmail.com>
> *Date:* 2022-02-11 00:48
> *To:*
> *CC:* 15927907987@163.com; user <us...@spark.apache.org>
> *Subject:* Re: Execution efficiency slows down as the number of CPU cores
> increases
>
> Hi,
>
>
> Can Please provide some more info about your Spark cluster setup. You
> mentioned Hadoop as the underlying storage. I assume that there is data
> locality between your Spark cluster and the
>
> the underlying hadoop.
>
>
> In your SQL statement below
>
> select count(*) from (
>         select *distinct* c_last_name, c_first_name, d_date
> You will have to shuffle because of distinct as each worker will have to
> read data separately and perform the reduce task to get the local distinct
> value and one final shuffle to get the actual distinct
> for all the data.
>
> Increasing the number of cores implies increasing parallel action on the
> underlying data for the same executor. So there would be more distinct jobs
> here. My view would be that you plot the execution time
> of tasks (y-axis) with x-axis being parameters of interest so it would be
> easier to grab it. You can use Excel for it.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 10 Feb 2022 at 13:16, Sean Owen <sr...@gmail.com> wrote:
>
>> More cores is finishing faster as expected. My guess is that you are
>> getting more parallelism overall and that speeds things up. However with
>> more tasks executing concurrently on one machine, you are getting some
>> contention, so it's possible more tasks are taking longer - a little I/O
>> contention, CPU throttling under load, etc. Could be other reasons but this
>> is possibly quite normal.
>>
>> On Thu, Feb 10, 2022 at 7:02 AM 15927907987@163.com <15...@163.com>
>> wrote:
>>
>>> Hello,
>>>     I recently used spark3.2 to do a test based on the TPC-DS dataset,
>>> and the entire TPC-DS data scale is 1TB(located in HDFS). But I encountered
>>> a problem that I couldn't understand, and I hope to get your help.
>>>     The SQL statement tested is as follows:
>>>      select count(*) from (
>>>         select distinct c_last_name, c_first_name, d_date
>>>          from store_sales, data_dim, customer
>>>                     where store_sales.ss_sold_date_sk =
>>> date_dim.d_date_sk
>>>              and store_sales.ss_customer_sk = customer.c_customer_sk
>>>              and d_month_seq between 1200 and 1200 + 11
>>>     )hot_cust
>>>      limit 100;
>>>
>>> Three tables are used here, and their sizes are: store_sales(387.97GB),
>>> date_dim(9.84GB),customer(1.52GB)
>>>
>>> When submitting the application, the driver and executor parameters are
>>> set as follows:
>>> DRIVER_OPTIONS="--master spark://xx.xx.xx.xx:7078 --total-executor-cores
>>> 48 --conf spark.driver.memory=100g --conf spark.default.parallelism=100
>>> --conf spark.sql.adaptive.enabled=false"
>>>  EXECUTOR_OPTIONS="--executor-cores 12 --conf spark.executor.memory=50g
>>> --conf spark.task.cpus=1 --conf spark.sql.shuffle.partitions=100 --conf
>>> spark.sql.crossJoin.enabled=true"
>>>
>>> The Spark cluster is built on a physical node with only one worker, and
>>> the submitted tasks are in StandAlone mode. A total of 56 cores, 376GB
>>> of memory. I kept executor-num=4 unchanged (that is, the total
>>> executors remained unchanged), and adjusted executor-cores=2, 4, 8, 12. The
>>> corresponding application time was: 96s, 66s, 50s, 49s. It was found that
>>> the total- executor-cores >=32, the time-consuming is almost unchanged.
>>> I monitored CPU utilization, IO, and found no bottlenecks. Then I
>>> analyzed the task time in the longest stage of each application in the
>>> spark web UI interface(The longest time-consuming stage in the four
>>> test cases is stage4, and the corresponding time-consuming: 48s, 28s, 22s,
>>> 23s). We see that with the increase of cpu cores, the task time in
>>> stage4 will also increase. The average task time is: 3s, 4s, 5s, 7s. And
>>> each executor takes longer to run the first batch of tasks than subsequent
>>> tasks. The more cores there are, the greater the time gap:
>>>
>>> Test-Case-ID total-executor-cores executor-cores total application time  stage4
>>> time average task time  The first batch of tasks takes time  The
>>> remaining batch tasks take time
>>> 1 8 2 96s 48s 7s 5-6s 3-4s
>>> 2 16 4 66s 28s 5s 7-8s ~4s
>>> 3 32 8 50s 23s 4s 10-12s ~5s
>>> 4 48 12 49s 22s 3s 14-15s ~6s
>>>
>>> This result confuses me, why after increasing the number of cores, the
>>> execution time of tasks becomes longer. I compared the data size processed
>>> by each task in different cases, they are all consistent. Or why is the
>>> execution efficiency almost unchanged after the number of cores increases
>>> to a certain number.
>>> Looking forward to your reply.
>>>
>>> ------------------------------
>>> 15927907987@163.com
>>>
>>

Re: Re: Execution efficiency slows down as the number of CPU cores increases

Posted by Sean Owen <sr...@gmail.com>.
Is this a problem? you're getting faster times with more cores.
See last email - CPU throttling for example?
Indeed more tasks with a shuffle might actually slow down one part of it.
More work is happening in parallel in the longer wall clock time.

On Sun, Feb 13, 2022 at 7:48 AM 15927907987@163.com <15...@163.com>
wrote:

> @Mich Talebzadeh @Sean Owen
> Thank you for your suggestion and help. I also tried to scan the
> time-consuming effects of various parameters in the past, such as
> executor-memory, driver-memory, parallelism, etc. These parameters will
> only affect when the value is small, and increase to a certain value, it
> has no effect on the execution time of each task. It is only found that
> the increase in the number of cpu cores has a great impact on the task
> execution time. Also, would you like to specify what spark cluster
> information I need to provide?
> Our spark cluster is deployed on a physical machine with a total of 365GB
> of memory, 56 CPU cores, and one worker. There is no Yarn service, and
> the standalone mode is used. HDFS is also deployed on the same machine,
> which has 4 mechanical hard disks (not ssd), and the data(1TB) is
> distributed among these 4 hard disks. When the application is running, we
> monitor the IO, and there is no IO bottleneck.
> Also, as I mentioned last time, there is one question that confuses me too. In
> the 4 tests, why does the execution time of the first batch of tasks in the
> stage of each test application take longer than the execution time of
> subsequent tasks? The execution times of subsequent tasks tend to be
> consistent? Would you like to help me  to find the reason for this? I
> will be very grateful.
> Sorry that the average task time column in the table was reversed last
> time, the correct data record is shown below.
> As far as I know, the increase in the number of CPU cores will sometimes
> lead to a decrease in the turbo frequency during CPU execution. Is this one
> of the reasons for the increase in task execution time?
> Looking forward to your reply.
>
> ------------------------------
> 15927907987@163.com
>
>
> *From:* Mich Talebzadeh <mi...@gmail.com>
> *Date:* 2022-02-11 00:48
> *To:*
> *CC:* 15927907987@163.com; user <us...@spark.apache.org>
> *Subject:* Re: Execution efficiency slows down as the number of CPU cores
> increases
>
> Hi,
>
>
> Can Please provide some more info about your Spark cluster setup. You
> mentioned Hadoop as the underlying storage. I assume that there is data
> locality between your Spark cluster and the
>
> the underlying hadoop.
>
>
> In your SQL statement below
>
> select count(*) from (
>         select *distinct* c_last_name, c_first_name, d_date
> You will have to shuffle because of distinct as each worker will have to
> read data separately and perform the reduce task to get the local distinct
> value and one final shuffle to get the actual distinct
> for all the data.
>
> Increasing the number of cores implies increasing parallel action on the
> underlying data for the same executor. So there would be more distinct jobs
> here. My view would be that you plot the execution time
> of tasks (y-axis) with x-axis being parameters of interest so it would be
> easier to grab it. You can use Excel for it.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 10 Feb 2022 at 13:16, Sean Owen <sr...@gmail.com> wrote:
>
>> More cores is finishing faster as expected. My guess is that you are
>> getting more parallelism overall and that speeds things up. However with
>> more tasks executing concurrently on one machine, you are getting some
>> contention, so it's possible more tasks are taking longer - a little I/O
>> contention, CPU throttling under load, etc. Could be other reasons but this
>> is possibly quite normal.
>>
>> On Thu, Feb 10, 2022 at 7:02 AM 15927907987@163.com <15...@163.com>
>> wrote:
>>
>>> Hello,
>>>     I recently used spark3.2 to do a test based on the TPC-DS dataset,
>>> and the entire TPC-DS data scale is 1TB(located in HDFS). But I encountered
>>> a problem that I couldn't understand, and I hope to get your help.
>>>     The SQL statement tested is as follows:
>>>      select count(*) from (
>>>         select distinct c_last_name, c_first_name, d_date
>>>          from store_sales, data_dim, customer
>>>                     where store_sales.ss_sold_date_sk =
>>> date_dim.d_date_sk
>>>              and store_sales.ss_customer_sk = customer.c_customer_sk
>>>              and d_month_seq between 1200 and 1200 + 11
>>>     )hot_cust
>>>      limit 100;
>>>
>>> Three tables are used here, and their sizes are: store_sales(387.97GB),
>>> date_dim(9.84GB),customer(1.52GB)
>>>
>>> When submitting the application, the driver and executor parameters are
>>> set as follows:
>>> DRIVER_OPTIONS="--master spark://xx.xx.xx.xx:7078 --total-executor-cores
>>> 48 --conf spark.driver.memory=100g --conf spark.default.parallelism=100
>>> --conf spark.sql.adaptive.enabled=false"
>>>  EXECUTOR_OPTIONS="--executor-cores 12 --conf spark.executor.memory=50g
>>> --conf spark.task.cpus=1 --conf spark.sql.shuffle.partitions=100 --conf
>>> spark.sql.crossJoin.enabled=true"
>>>
>>> The Spark cluster is built on a physical node with only one worker, and
>>> the submitted tasks are in StandAlone mode. A total of 56 cores, 376GB
>>> of memory. I kept executor-num=4 unchanged (that is, the total
>>> executors remained unchanged), and adjusted executor-cores=2, 4, 8, 12. The
>>> corresponding application time was: 96s, 66s, 50s, 49s. It was found that
>>> the total- executor-cores >=32, the time-consuming is almost unchanged.
>>> I monitored CPU utilization, IO, and found no bottlenecks. Then I
>>> analyzed the task time in the longest stage of each application in the
>>> spark web UI interface(The longest time-consuming stage in the four
>>> test cases is stage4, and the corresponding time-consuming: 48s, 28s, 22s,
>>> 23s). We see that with the increase of cpu cores, the task time in
>>> stage4 will also increase. The average task time is: 3s, 4s, 5s, 7s. And
>>> each executor takes longer to run the first batch of tasks than subsequent
>>> tasks. The more cores there are, the greater the time gap:
>>>
>>> Test-Case-ID total-executor-cores executor-cores total application time  stage4
>>> time average task time  The first batch of tasks takes time  The
>>> remaining batch tasks take time
>>> 1 8 2 96s 48s 7s 5-6s 3-4s
>>> 2 16 4 66s 28s 5s 7-8s ~4s
>>> 3 32 8 50s 23s 4s 10-12s ~5s
>>> 4 48 12 49s 22s 3s 14-15s ~6s
>>>
>>> This result confuses me, why after increasing the number of cores, the
>>> execution time of tasks becomes longer. I compared the data size processed
>>> by each task in different cases, they are all consistent. Or why is the
>>> execution efficiency almost unchanged after the number of cores increases
>>> to a certain number.
>>> Looking forward to your reply.
>>>
>>> ------------------------------
>>> 15927907987@163.com
>>>
>>

Re: Execution efficiency slows down as the number of CPU cores increases

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,


Can Please provide some more info about your Spark cluster setup. You
mentioned Hadoop as the underlying storage. I assume that there is data
locality between your Spark cluster and the

the underlying hadoop.


In your SQL statement below

select count(*) from (
        select *distinct* c_last_name, c_first_name, d_date
You will have to shuffle because of distinct as each worker will have to
read data separately and perform the reduce task to get the local distinct
value and one final shuffle to get the actual distinct
for all the data.

Increasing the number of cores implies increasing parallel action on the
underlying data for the same executor. So there would be more distinct jobs
here. My view would be that you plot the execution time
of tasks (y-axis) with x-axis being parameters of interest so it would be
easier to grab it. You can use Excel for it.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 10 Feb 2022 at 13:16, Sean Owen <sr...@gmail.com> wrote:

> More cores is finishing faster as expected. My guess is that you are
> getting more parallelism overall and that speeds things up. However with
> more tasks executing concurrently on one machine, you are getting some
> contention, so it's possible more tasks are taking longer - a little I/O
> contention, CPU throttling under load, etc. Could be other reasons but this
> is possibly quite normal.
>
> On Thu, Feb 10, 2022 at 7:02 AM 15927907987@163.com <15...@163.com>
> wrote:
>
>> Hello,
>>     I recently used spark3.2 to do a test based on the TPC-DS dataset,
>> and the entire TPC-DS data scale is 1TB(located in HDFS). But I encountered
>> a problem that I couldn't understand, and I hope to get your help.
>>     The SQL statement tested is as follows:
>>      select count(*) from (
>>         select distinct c_last_name, c_first_name, d_date
>>          from store_sales, data_dim, customer
>>                     where store_sales.ss_sold_date_sk = date_dim.d_date_sk
>>              and store_sales.ss_customer_sk = customer.c_customer_sk
>>              and d_month_seq between 1200 and 1200 + 11
>>     )hot_cust
>>      limit 100;
>>
>> Three tables are used here, and their sizes are: store_sales(387.97GB),
>> date_dim(9.84GB),customer(1.52GB)
>>
>> When submitting the application, the driver and executor parameters are
>> set as follows:
>> DRIVER_OPTIONS="--master spark://xx.xx.xx.xx:7078 --total-executor-cores
>> 48 --conf spark.driver.memory=100g --conf spark.default.parallelism=100
>> --conf spark.sql.adaptive.enabled=false"
>>  EXECUTOR_OPTIONS="--executor-cores 12 --conf spark.executor.memory=50g
>> --conf spark.task.cpus=1 --conf spark.sql.shuffle.partitions=100 --conf
>> spark.sql.crossJoin.enabled=true"
>>
>> The Spark cluster is built on a physical node with only one worker, and
>> the submitted tasks are in StandAlone mode. A total of 56 cores, 376GB
>> of memory. I kept executor-num=4 unchanged (that is, the total executors
>> remained unchanged), and adjusted executor-cores=2, 4, 8, 12. The
>> corresponding application time was: 96s, 66s, 50s, 49s. It was found that
>> the total- executor-cores >=32, the time-consuming is almost unchanged.
>> I monitored CPU utilization, IO, and found no bottlenecks. Then I
>> analyzed the task time in the longest stage of each application in the
>> spark web UI interface(The longest time-consuming stage in the four test
>> cases is stage4, and the corresponding time-consuming: 48s, 28s, 22s, 23s
>> ). We see that with the increase of cpu cores, the task time in stage4
>> will also increase. The average task time is: 3s, 4s, 5s, 7s. And each
>> executor takes longer to run the first batch of tasks than subsequent
>> tasks. The more cores there are, the greater the time gap:
>>
>> Test-Case-ID total-executor-cores executor-cores total application time  stage4
>> time average task time  The first batch of tasks takes time  The
>> remaining batch tasks take time
>> 1 8 2 96s 48s 7s 5-6s 3-4s
>> 2 16 4 66s 28s 5s 7-8s ~4s
>> 3 32 8 50s 23s 4s 10-12s ~5s
>> 4 48 12 49s 22s 3s 14-15s ~6s
>>
>> This result confuses me, why after increasing the number of cores, the
>> execution time of tasks becomes longer. I compared the data size processed
>> by each task in different cases, they are all consistent. Or why is the
>> execution efficiency almost unchanged after the number of cores increases
>> to a certain number.
>> Looking forward to your reply.
>>
>> ------------------------------
>> 15927907987@163.com
>>
>

Re: Execution efficiency slows down as the number of CPU cores increases

Posted by Sean Owen <sr...@gmail.com>.
More cores is finishing faster as expected. My guess is that you are
getting more parallelism overall and that speeds things up. However with
more tasks executing concurrently on one machine, you are getting some
contention, so it's possible more tasks are taking longer - a little I/O
contention, CPU throttling under load, etc. Could be other reasons but this
is possibly quite normal.

On Thu, Feb 10, 2022 at 7:02 AM 15927907987@163.com <15...@163.com>
wrote:

> Hello,
>     I recently used spark3.2 to do a test based on the TPC-DS dataset,
> and the entire TPC-DS data scale is 1TB(located in HDFS). But I encountered
> a problem that I couldn't understand, and I hope to get your help.
>     The SQL statement tested is as follows:
>      select count(*) from (
>         select distinct c_last_name, c_first_name, d_date
>          from store_sales, data_dim, customer
>                     where store_sales.ss_sold_date_sk = date_dim.d_date_sk
>              and store_sales.ss_customer_sk = customer.c_customer_sk
>              and d_month_seq between 1200 and 1200 + 11
>     )hot_cust
>      limit 100;
>
> Three tables are used here, and their sizes are: store_sales(387.97GB),
> date_dim(9.84GB),customer(1.52GB)
>
> When submitting the application, the driver and executor parameters are
> set as follows:
> DRIVER_OPTIONS="--master spark://xx.xx.xx.xx:7078 --total-executor-cores
> 48 --conf spark.driver.memory=100g --conf spark.default.parallelism=100
> --conf spark.sql.adaptive.enabled=false"
>  EXECUTOR_OPTIONS="--executor-cores 12 --conf spark.executor.memory=50g
> --conf spark.task.cpus=1 --conf spark.sql.shuffle.partitions=100 --conf
> spark.sql.crossJoin.enabled=true"
>
> The Spark cluster is built on a physical node with only one worker, and
> the submitted tasks are in StandAlone mode. A total of 56 cores, 376GB of
> memory. I kept executor-num=4 unchanged (that is, the total executors
> remained unchanged), and adjusted executor-cores=2, 4, 8, 12. The
> corresponding application time was: 96s, 66s, 50s, 49s. It was found that
> the total- executor-cores >=32, the time-consuming is almost unchanged.
> I monitored CPU utilization, IO, and found no bottlenecks. Then I
> analyzed the task time in the longest stage of each application in the
> spark web UI interface(The longest time-consuming stage in the four test
> cases is stage4, and the corresponding time-consuming: 48s, 28s, 22s, 23s
> ). We see that with the increase of cpu cores, the task time in stage4
> will also increase. The average task time is: 3s, 4s, 5s, 7s. And each
> executor takes longer to run the first batch of tasks than subsequent
> tasks. The more cores there are, the greater the time gap:
>
> Test-Case-ID total-executor-cores executor-cores total application time  stage4
> time average task time  The first batch of tasks takes time  The
> remaining batch tasks take time
> 1 8 2 96s 48s 7s 5-6s 3-4s
> 2 16 4 66s 28s 5s 7-8s ~4s
> 3 32 8 50s 23s 4s 10-12s ~5s
> 4 48 12 49s 22s 3s 14-15s ~6s
>
> This result confuses me, why after increasing the number of cores, the
> execution time of tasks becomes longer. I compared the data size processed
> by each task in different cases, they are all consistent. Or why is the
> execution efficiency almost unchanged after the number of cores increases
> to a certain number.
> Looking forward to your reply.
>
> ------------------------------
> 15927907987@163.com
>