You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "devinduan (段丁瑞)" <de...@tencent.com> on 2018/09/18 06:39:19 UTC

How to optimize the performance of Beam on Spark

Hi,
    I'm testing Beam on Spark.
    I use spark example code WordCount processing 1G data file, cost 1 minutes.
    However, I use Beam example code WordCount processing the same file, cost 30minutes.
    My Spark parameter is :  --deploy-mode client  --executor-memory 1g --num-executors 1 --driver-memory 1g
    My Spark version is 2.3.1,  Beam version is 2.5
    Is there any optimization method?
Thank you.



Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by Tim Robertson <ti...@gmail.com>.
Thanks for sharing those results.

The second set (executors at 20-30) look similar to what I would have
expected.
BEAM-5036 definitely plays a part here as the data is not moved on HDFS
efficiently (fix in PR awaiting review now [1]).

To give an idea of the impact, here are some numbers from my own tests.
Without knowing your code, I presume mine is similar to your filter (take
data, modify it, write data with no shuffle/group/join)

My environment: 10 node YARN CDH 5.12.2 cluster, rewriting a 1.5TB AvroIO
file (code here [2]) I observed:

  - Using Spark API: 35 minutes
  - Beam AvroIO (2.6.0): 1.7hrs
  - Beam AvroIO with the 5036 fix: 42 minutes

Related: I also anticipate that varying the spark.default.parallelism will
affect Beam runtime.

Thanks,
Tim


[1] https://github.com/apache/beam/pull/6289
[2] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


On Fri, Sep 28, 2018 at 9:27 AM Robert Bradshaw <ro...@google.com> wrote:

> Something here on the Beam side is clearly linear in the input size, as if
> there's a bottleneck where were' not able to get any parallelization. Is
> the spark variant running in parallel?
>
> On Fri, Sep 28, 2018 at 4:57 AM devinduan(段丁瑞) <de...@tencent.com>
> wrote:
>
>> Hi
>>     I have completed my test.
>> 1. Spark parameter :
>> deploy-mode client
>> executor-memory 1g
>> num-executors 1
>> driver-memory 1g
>>
>> WordCount:
>>
>>
>>
>> 300MB
>>
>> 600MB
>>
>> 1.2G
>>
>> Spark
>>
>> 1min8s
>>
>> 1min11s
>>
>> 1min18s
>>
>> Beam
>>
>> 6.4min
>>
>> 11min
>>
>> 22min
>>
>>
>>
>> Filter:
>>
>>
>>
>> 300MB
>>
>> 600MB
>>
>> 1.2G
>>
>> Spark
>>
>> 1.2min
>>
>> 1.7min
>>
>> 2.8min
>>
>> Beam
>>
>> 2.7min
>>
>> 4.1min
>>
>> 5.7min
>>
>>
>>
>> GroupbyKey + sum
>>
>>
>>
>> 300MB
>>
>> 600MB
>>
>> 1.2G
>>
>> Spark
>>
>> 3.6min
>>
>>
>>
>>
>>
>> Beam
>>
>> Failed, executor oom
>>
>>
>>
>>
>>
>>
>>
>> Union
>>
>>
>>
>> 300MB
>>
>> 600MB
>>
>> 1.2G
>>
>> Spark
>>
>> 1.7min
>>
>> 2.6min
>>
>> 5.1min
>>
>> Beam
>>
>> 3.6min
>>
>> 6.2min
>>
>> 11min
>>
>>
>>
>> 2. Spark parameter :
>>
>> deploy-mode client
>>
>> executor-memory 1g
>>
>> driver-memory 1g
>>
>> spark.dynamicAllocation.enabled                            true
>>
>

Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by Robert Bradshaw <ro...@google.com>.
Something here on the Beam side is clearly linear in the input size, as if
there's a bottleneck where were' not able to get any parallelization. Is
the spark variant running in parallel?

On Fri, Sep 28, 2018 at 4:57 AM devinduan(段丁瑞) <de...@tencent.com>
wrote:

> Hi
>     I have completed my test.
> 1. Spark parameter :
> deploy-mode client
> executor-memory 1g
> num-executors 1
> driver-memory 1g
>
> WordCount:
>
>
>
> 300MB
>
> 600MB
>
> 1.2G
>
> Spark
>
> 1min8s
>
> 1min11s
>
> 1min18s
>
> Beam
>
> 6.4min
>
> 11min
>
> 22min
>
>
>
> Filter:
>
>
>
> 300MB
>
> 600MB
>
> 1.2G
>
> Spark
>
> 1.2min
>
> 1.7min
>
> 2.8min
>
> Beam
>
> 2.7min
>
> 4.1min
>
> 5.7min
>
>
>
> GroupbyKey + sum
>
>
>
> 300MB
>
> 600MB
>
> 1.2G
>
> Spark
>
> 3.6min
>
>
>
>
>
> Beam
>
> Failed, executor oom
>
>
>
>
>
>
>
> Union
>
>
>
> 300MB
>
> 600MB
>
> 1.2G
>
> Spark
>
> 1.7min
>
> 2.6min
>
> 5.1min
>
> Beam
>
> 3.6min
>
> 6.2min
>
> 11min
>
>
>
> 2. Spark parameter :
>
> deploy-mode client
>
> executor-memory 1g
>
> driver-memory 1g
>
> spark.dynamicAllocation.enabled                            true
>

Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by "devinduan (段丁瑞)" <de...@tencent.com>.
Hi
    I have completed my test.
1. Spark parameter :
deploy-mode client
executor-memory 1g
num-executors 1
driver-memory 1g

WordCount:


300MB

600MB

1.2G

Spark

1min8s

1min11s

1min18s

Beam

6.4min

11min

22min


Filter:


300MB

600MB

1.2G

Spark

1.2min

1.7min

2.8min

Beam

2.7min

4.1min

5.7min


GroupbyKey + sum


300MB

600MB

1.2G

Spark

3.6min





Beam

Failed, executor oom






Union


300MB

600MB

1.2G

Spark

1.7min

2.6min

5.1min

Beam

3.6min

6.2min

11min


2. Spark parameter :
deploy-mode client
executor-memory 1g
driver-memory 1g
spark.dynamicAllocation.enabled                            true
spark.dynamicAllocation.minExecutors                        2
spark.dynamicAllocation.maxExecutors                        30

wordcount:


300MB

600MB

1.2G

Spark

53s,use 9 executors

53s, use 9 executors

53s, use 9 executors

Beam

1.6min, use 29 executors

2min, use 30 executors

2.5min, use 30 executors


Filter:


300MB

600MB

1.2G

Spark

1.2min,use 22 executors

1.3min,use 23 executors

1.6min,use 29 executors

Beam

1.8min, use 22 executors

2.7min, use 30 executors

2.5min, use 30 executors


GroupbyKey + sum


300MB

600MB

1.2G

Spark







Beam

Failed,executor oom






Union


300MB

600MB

1.2G

Spark

2.3min,use 30 executors

2.3min,use 30 executors

2.3min,use 30 executors

Beam

2.7min, use 29 executors

3.4min, use 30 executors

4.7min, use 30 executors




From: devinduan(段丁瑞)<ma...@tencent.com>
Date: 2018-09-19 17:17
To: Tim Robertson<ma...@gmail.com>
CC: dev<ma...@beam.apache.org>; jb<ma...@nanthrax.net>
Subject: Re: Re: How to optimize the performance of Beam on Spark(Internet mail)
Got it.
I will also set "spark.dynamicAllocation.enabled=true" to test.


From: Tim Robertson<ma...@gmail.com>
Date: 2018-09-19 17:04
To: dev@beam.apache.org<ma...@beam.apache.org>
CC: jb@nanthrax.net<ma...@nanthrax.net>
Subject: Re: Re: How to optimize the performance of Beam on Spark(Internet mail)
Thank you Devin

Can you also please try Beam with more spark executors if you are able?

On Wed, Sep 19, 2018 at 10:47 AM devinduan(段丁瑞) <de...@tencent.com>> wrote:
Thanks for your help!
I will test other examples of Beam On Spark in the future and then feed back the results.
Regards
devin

From: Jean-Baptiste Onofré<ma...@nanthrax.net>
Date: 2018-09-19 16:32
To: devinduan(段丁瑞)<ma...@tencent.com>; dev<ma...@beam.apache.org>
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)

Thanks for the details.

I will take a look later tomorrow (I have another issue to investigate
on the Spark runner today for Beam 2.7.0 release).

Regards
JB

On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> Hi,
>     I test 300MB data file.
>     Use command like:
>     ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g
>
>  I set only one exeuctor. so task run in sequence . One task cost 10s.
> However, Spark task cost only 0.4s
>
>
>
>     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     *Date:* 2018-09-19 12:22
>     *To:* dev@beam.apache.org<ma...@beam.apache.org> <ma...@beam.apache.org>
>     *Subject:* Re: How to optimize the performance of Beam on
>     Spark(Internet mail)
>
>     Hi,
>
>     did you compare the stages in the Spark UI in order to identify which
>     stage is taking time ?
>
>     You use spark-submit in both cases for the bootstrapping ?
>
>     I will do a test here as well.
>
>     Regards
>     JB
>
>     On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
>     > Hi,
>     >     Thanks for you reply.
>     >     Our team plan to use Beam instead of Spark, So I'm testing the
>     > performance of Beam API.
>     >     I'm coding some example through Spark API and Beam API , like
>     > "WordCount" , "Join",  "OrderBy",  "Union" ...
>     >     I use the same Resources and configuration to run these Job.
>     >    Tim said I should remove "withNumShards(1)" and
>     > set spark.default.parallelism=32. I did it and tried again, but
>     Beam job
>     > still running very slowly.
>     >     Here is My Beam code and Spark code:
>     >    Beam "WordCount":
>     >
>     >    Spark "WordCount":
>     >
>     >    I will try the other example later.
>     >
>     > Regards
>     > devin
>     >
>     >
>     >     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     >     *Date:* 2018-09-18 22:43
>     >     *To:* dev@beam.apache.org<ma...@beam.apache.org> <ma...@beam.apache.org>
>     >     *Subject:* Re: How to optimize the performance of Beam on
>     >     Spark(Internet mail)
>     >
>     >     Hi,
>     >
>     >     The first huge difference is the fact that the spark runner
>     still uses
>     >     RDD whereas directly using spark, you are using dataset. A
>     bunch of
>     >     optimization in spark are related to dataset.
>     >
>     >     I started a large refactoring of the spark runner to leverage
>     Spark 2.x
>     >     (and dataset).
>     >     It's not yet ready as it includes other improvements (the
>     portability
>     >     layer with Job API, a first check of state API, ...).
>     >
>     >     Anyway, by Spark wordcount, you mean the one included in the spark
>     >     distribution ?
>     >
>     >     Regards
>     >     JB
>     >
>     >     On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
>     >     > Hi,
>     >     >     I'm testing Beam on Spark.
>     >     >     I use spark example code WordCount processing 1G data
>     file, cost 1
>     >     > minutes.
>     >     >     However, I use Beam example code WordCount processing
>     the same
>     >     file,
>     >     > cost 30minutes.
>     >     >     My Spark parameter is :  --deploy-mode client
>     >      --executor-memory 1g
>     >     > --num-executors 1 --driver-memory 1g
>     >     >     My Spark version is 2.3.1,  Beam version is 2.5
>     >     >     Is there any optimization method?
>     >     > Thank you.
>     >     >
>     >     >
>     >
>     >     --
>     >     Jean-Baptiste Onofré
>     >     jbonofre@apache.org<ma...@apache.org>
>     >     http://blog.nanthrax.net
>     >     Talend - http://www.talend.com
>     >
>
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org<ma...@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>

--
Jean-Baptiste Onofré
jbonofre@apache.org<ma...@apache.org>
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by "devinduan (段丁瑞)" <de...@tencent.com>.
Got it.
I will also set "spark.dynamicAllocation.enabled=true" to test.


From: Tim Robertson<ma...@gmail.com>
Date: 2018-09-19 17:04
To: dev@beam.apache.org<ma...@beam.apache.org>
CC: jb@nanthrax.net<ma...@nanthrax.net>
Subject: Re: Re: How to optimize the performance of Beam on Spark(Internet mail)
Thank you Devin

Can you also please try Beam with more spark executors if you are able?

On Wed, Sep 19, 2018 at 10:47 AM devinduan(段丁瑞) <de...@tencent.com>> wrote:
Thanks for your help!
I will test other examples of Beam On Spark in the future and then feed back the results.
Regards
devin

From: Jean-Baptiste Onofré<ma...@nanthrax.net>
Date: 2018-09-19 16:32
To: devinduan(段丁瑞)<ma...@tencent.com>; dev<ma...@beam.apache.org>
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)

Thanks for the details.

I will take a look later tomorrow (I have another issue to investigate
on the Spark runner today for Beam 2.7.0 release).

Regards
JB

On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> Hi,
>     I test 300MB data file.
>     Use command like:
>     ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g
>
>  I set only one exeuctor. so task run in sequence . One task cost 10s.
> However, Spark task cost only 0.4s
>
>
>
>     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     *Date:* 2018-09-19 12:22
>     *To:* dev@beam.apache.org<ma...@beam.apache.org> <ma...@beam.apache.org>
>     *Subject:* Re: How to optimize the performance of Beam on
>     Spark(Internet mail)
>
>     Hi,
>
>     did you compare the stages in the Spark UI in order to identify which
>     stage is taking time ?
>
>     You use spark-submit in both cases for the bootstrapping ?
>
>     I will do a test here as well.
>
>     Regards
>     JB
>
>     On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
>     > Hi,
>     >     Thanks for you reply.
>     >     Our team plan to use Beam instead of Spark, So I'm testing the
>     > performance of Beam API.
>     >     I'm coding some example through Spark API and Beam API , like
>     > "WordCount" , "Join",  "OrderBy",  "Union" ...
>     >     I use the same Resources and configuration to run these Job.
>     >    Tim said I should remove "withNumShards(1)" and
>     > set spark.default.parallelism=32. I did it and tried again, but
>     Beam job
>     > still running very slowly.
>     >     Here is My Beam code and Spark code:
>     >    Beam "WordCount":
>     >
>     >    Spark "WordCount":
>     >
>     >    I will try the other example later.
>     >
>     > Regards
>     > devin
>     >
>     >
>     >     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     >     *Date:* 2018-09-18 22:43
>     >     *To:* dev@beam.apache.org<ma...@beam.apache.org> <ma...@beam.apache.org>
>     >     *Subject:* Re: How to optimize the performance of Beam on
>     >     Spark(Internet mail)
>     >
>     >     Hi,
>     >
>     >     The first huge difference is the fact that the spark runner
>     still uses
>     >     RDD whereas directly using spark, you are using dataset. A
>     bunch of
>     >     optimization in spark are related to dataset.
>     >
>     >     I started a large refactoring of the spark runner to leverage
>     Spark 2.x
>     >     (and dataset).
>     >     It's not yet ready as it includes other improvements (the
>     portability
>     >     layer with Job API, a first check of state API, ...).
>     >
>     >     Anyway, by Spark wordcount, you mean the one included in the spark
>     >     distribution ?
>     >
>     >     Regards
>     >     JB
>     >
>     >     On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
>     >     > Hi,
>     >     >     I'm testing Beam on Spark.
>     >     >     I use spark example code WordCount processing 1G data
>     file, cost 1
>     >     > minutes.
>     >     >     However, I use Beam example code WordCount processing
>     the same
>     >     file,
>     >     > cost 30minutes.
>     >     >     My Spark parameter is :  --deploy-mode client
>     >      --executor-memory 1g
>     >     > --num-executors 1 --driver-memory 1g
>     >     >     My Spark version is 2.3.1,  Beam version is 2.5
>     >     >     Is there any optimization method?
>     >     > Thank you.
>     >     >
>     >     >
>     >
>     >     --
>     >     Jean-Baptiste Onofré
>     >     jbonofre@apache.org<ma...@apache.org>
>     >     http://blog.nanthrax.net
>     >     Talend - http://www.talend.com
>     >
>
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org<ma...@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>

--
Jean-Baptiste Onofré
jbonofre@apache.org<ma...@apache.org>
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by Tim Robertson <ti...@gmail.com>.
Thank you Devin

Can you also please try Beam with more spark executors if you are able?

On Wed, Sep 19, 2018 at 10:47 AM devinduan(段丁瑞) <de...@tencent.com>
wrote:

> Thanks for your help!
> I will test other examples of Beam On Spark in the future and then feed
> back the results.
> Regards
> devin
>
>
> *From:* Jean-Baptiste Onofré <jb...@nanthrax.net>
> *Date:* 2018-09-19 16:32
> *To:* devinduan(段丁瑞) <de...@tencent.com>; dev <de...@beam.apache.org>
> *Subject:* Re: How to optimize the performance of Beam on Spark(Internet
> mail)
>
> Thanks for the details.
>
> I will take a look later tomorrow (I have another issue to investigate
> on the Spark runner today for Beam 2.7.0 release).
>
> Regards
> JB
>
> On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> > Hi,
> >     I test 300MB data file.
> >     Use command like:
> >     ./spark-submit --master yarn --deploy-mode client  --class
> > com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory
> 1g
> >
> >  I set only one exeuctor. so task run in sequence . One task cost 10s.
> > However, Spark task cost only 0.4s
> >
> >
> >
> >     *From:* Jean-Baptiste Onofré <mailto:jb@nanthrax.net
> <jb...@nanthrax.net>>
> >     *Date:* 2018-09-19 12:22
> >     *To:* dev@beam.apache.org <mailto:dev@beam.apache.org
> <de...@beam.apache.org>>
> >     *Subject:* Re: How to optimize the performance of Beam on
> >     Spark(Internet mail)
> >
> >     Hi,
> >
> >     did you compare the stages in the Spark UI in order to identify which
> >     stage is taking time ?
> >
> >     You use spark-submit in both cases for the bootstrapping ?
> >
> >     I will do a test here as well.
> >
> >     Regards
> >     JB
> >
> >     On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
> >     > Hi,
> >     >     Thanks for you reply.
> >     >     Our team plan to use Beam instead of Spark, So I'm testing the
> >     > performance of Beam API.
> >     >     I'm coding some example through Spark API and Beam API , like
> >     > "WordCount" , "Join",  "OrderBy",  "Union" ...
> >     >     I use the same Resources and configuration to run these Job.
> >     >    Tim said I should remove "withNumShards(1)" and
> >     > set spark.default.parallelism=32. I did it and tried again, but
> >     Beam job
> >     > still running very slowly.
> >     >     Here is My Beam code and Spark code:
> >     >    Beam "WordCount":
> >     >
> >     >    Spark "WordCount":
> >     >
> >     >    I will try the other example later.
> >     >
> >     > Regards
> >     > devin
> >     >
> >     >
> >     >     *From:* Jean-Baptiste Onofré <mailto:jb@nanthrax.net
> <jb...@nanthrax.net>>
> >     >     *Date:* 2018-09-18 22:43
> >     >     *To:* dev@beam.apache.org <mailto:dev@beam.apache.org
> <de...@beam.apache.org>>
> >     >     *Subject:* Re: How to optimize the performance of Beam on
> >     >     Spark(Internet mail)
> >     >
> >     >     Hi,
> >     >
> >     >     The first huge difference is the fact that the spark runner
> >     still uses
> >     >     RDD whereas directly using spark, you are using dataset. A
> >     bunch of
> >     >     optimization in spark are related to dataset.
> >     >
> >     >     I started a large refactoring of the spark runner to leverage
> >     Spark 2.x
> >     >     (and dataset).
> >     >     It's not yet ready as it includes other improvements (the
> >     portability
> >     >     layer with Job API, a first check of state API, ...).
> >     >
> >     >     Anyway, by Spark wordcount, you mean the one included in the
> spark
> >     >     distribution ?
> >     >
> >     >     Regards
> >     >     JB
> >     >
> >     >     On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> >     >     > Hi,
> >     >     >     I'm testing Beam on Spark.
> >     >     >     I use spark example code WordCount processing 1G data
> >     file, cost 1
> >     >     > minutes.
> >     >     >     However, I use Beam example code WordCount processing
> >     the same
> >     >     file,
> >     >     > cost 30minutes.
> >     >     >     My Spark parameter is :  --deploy-mode client
> >     >      --executor-memory 1g
> >     >     > --num-executors 1 --driver-memory 1g
> >     >     >     My Spark version is 2.3.1,  Beam version is 2.5
> >     >     >     Is there any optimization method?
> >     >     > Thank you.
> >     >     >
> >     >     >
> >     >
> >     >     --
> >     >     Jean-Baptiste Onofré
> >     >     jbonofre@apache.org
> >     >     http://blog.nanthrax.net
> >     >     Talend - http://www.talend.com
> >     >
> >
> >     --
> >     Jean-Baptiste Onofré
> >     jbonofre@apache.org
> >     http://blog.nanthrax.net
> >     Talend - http://www.talend.com
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>

Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by "devinduan (段丁瑞)" <de...@tencent.com>.
Thanks for your help!
I will test other examples of Beam On Spark in the future and then feed back the results.
Regards
devin

From: Jean-Baptiste Onofré<ma...@nanthrax.net>
Date: 2018-09-19 16:32
To: devinduan(段丁瑞)<ma...@tencent.com>; dev<ma...@beam.apache.org>
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)

Thanks for the details.

I will take a look later tomorrow (I have another issue to investigate
on the Spark runner today for Beam 2.7.0 release).

Regards
JB

On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> Hi,
>     I test 300MB data file.
>     Use command like:
>     ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g
>
>  I set only one exeuctor. so task run in sequence . One task cost 10s.
> However, Spark task cost only 0.4s
>
>
>
>     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     *Date:* 2018-09-19 12:22
>     *To:* dev@beam.apache.org <ma...@beam.apache.org>
>     *Subject:* Re: How to optimize the performance of Beam on
>     Spark(Internet mail)
>
>     Hi,
>
>     did you compare the stages in the Spark UI in order to identify which
>     stage is taking time ?
>
>     You use spark-submit in both cases for the bootstrapping ?
>
>     I will do a test here as well.
>
>     Regards
>     JB
>
>     On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
>     > Hi,
>     >     Thanks for you reply.
>     >     Our team plan to use Beam instead of Spark, So I'm testing the
>     > performance of Beam API.
>     >     I'm coding some example through Spark API and Beam API , like
>     > "WordCount" , "Join",  "OrderBy",  "Union" ...
>     >     I use the same Resources and configuration to run these Job.
>     >    Tim said I should remove "withNumShards(1)" and
>     > set spark.default.parallelism=32. I did it and tried again, but
>     Beam job
>     > still running very slowly.
>     >     Here is My Beam code and Spark code:
>     >    Beam "WordCount":
>     >
>     >    Spark "WordCount":
>     >
>     >    I will try the other example later.
>     >
>     > Regards
>     > devin
>     >
>     >
>     >     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     >     *Date:* 2018-09-18 22:43
>     >     *To:* dev@beam.apache.org <ma...@beam.apache.org>
>     >     *Subject:* Re: How to optimize the performance of Beam on
>     >     Spark(Internet mail)
>     >
>     >     Hi,
>     >
>     >     The first huge difference is the fact that the spark runner
>     still uses
>     >     RDD whereas directly using spark, you are using dataset. A
>     bunch of
>     >     optimization in spark are related to dataset.
>     >
>     >     I started a large refactoring of the spark runner to leverage
>     Spark 2.x
>     >     (and dataset).
>     >     It's not yet ready as it includes other improvements (the
>     portability
>     >     layer with Job API, a first check of state API, ...).
>     >
>     >     Anyway, by Spark wordcount, you mean the one included in the spark
>     >     distribution ?
>     >
>     >     Regards
>     >     JB
>     >
>     >     On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
>     >     > Hi,
>     >     >     I'm testing Beam on Spark.
>     >     >     I use spark example code WordCount processing 1G data
>     file, cost 1
>     >     > minutes.
>     >     >     However, I use Beam example code WordCount processing
>     the same
>     >     file,
>     >     > cost 30minutes.
>     >     >     My Spark parameter is :  --deploy-mode client
>     >      --executor-memory 1g
>     >     > --num-executors 1 --driver-memory 1g
>     >     >     My Spark version is 2.3.1,  Beam version is 2.5
>     >     >     Is there any optimization method?
>     >     > Thank you.
>     >     >
>     >     >
>     >
>     >     --
>     >     Jean-Baptiste Onofré
>     >     jbonofre@apache.org
>     >     http://blog.nanthrax.net
>     >     Talend - http://www.talend.com
>     >
>
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Thanks for the details.

I will take a look later tomorrow (I have another issue to investigate
on the Spark runner today for Beam 2.7.0 release).

Regards
JB

On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> Hi,
>     I test 300MB data file.
>     Use command like:
>     ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g 
>    
>  I set only one exeuctor. so task run in sequence . One task cost 10s.
> However, Spark task cost only 0.4s
> 
> 
> 
>     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     *Date:* 2018-09-19 12:22
>     *To:* dev@beam.apache.org <ma...@beam.apache.org>
>     *Subject:* Re: How to optimize the performance of Beam on
>     Spark(Internet mail)
> 
>     Hi,
> 
>     did you compare the stages in the Spark UI in order to identify which
>     stage is taking time ?
> 
>     You use spark-submit in both cases for the bootstrapping ?
> 
>     I will do a test here as well.
> 
>     Regards
>     JB
> 
>     On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
>     > Hi,
>     >     Thanks for you reply.
>     >     Our team plan to use Beam instead of Spark, So I'm testing the
>     > performance of Beam API.
>     >     I'm coding some example through Spark API and Beam API , like
>     > "WordCount" , "Join",  "OrderBy",  "Union" ...
>     >     I use the same Resources and configuration to run these Job.   
>     >    Tim said I should remove "withNumShards(1)" and
>     > set spark.default.parallelism=32. I did it and tried again, but
>     Beam job
>     > still running very slowly.
>     >     Here is My Beam code and Spark code:
>     >    Beam "WordCount":
>     >     
>     >    Spark "WordCount":
>     >
>     >    I will try the other example later.
>     >     
>     > Regards
>     > devin
>     >
>     >      
>     >     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     >     *Date:* 2018-09-18 22:43
>     >     *To:* dev@beam.apache.org <ma...@beam.apache.org>
>     >     *Subject:* Re: How to optimize the performance of Beam on
>     >     Spark(Internet mail)
>     >
>     >     Hi,
>     >
>     >     The first huge difference is the fact that the spark runner
>     still uses
>     >     RDD whereas directly using spark, you are using dataset. A
>     bunch of
>     >     optimization in spark are related to dataset.
>     >
>     >     I started a large refactoring of the spark runner to leverage
>     Spark 2.x
>     >     (and dataset).
>     >     It's not yet ready as it includes other improvements (the
>     portability
>     >     layer with Job API, a first check of state API, ...).
>     >
>     >     Anyway, by Spark wordcount, you mean the one included in the spark
>     >     distribution ?
>     >
>     >     Regards
>     >     JB
>     >
>     >     On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
>     >     > Hi,
>     >     >     I'm testing Beam on Spark. 
>     >     >     I use spark example code WordCount processing 1G data
>     file, cost 1
>     >     > minutes.
>     >     >     However, I use Beam example code WordCount processing
>     the same
>     >     file,
>     >     > cost 30minutes.
>     >     >     My Spark parameter is :  --deploy-mode client
>     >      --executor-memory 1g
>     >     > --num-executors 1 --driver-memory 1g
>     >     >     My Spark version is 2.3.1,  Beam version is 2.5
>     >     >     Is there any optimization method?
>     >     > Thank you.
>     >     >
>     >     >    
>     >
>     >     --
>     >     Jean-Baptiste Onofré
>     >     jbonofre@apache.org
>     >     http://blog.nanthrax.net
>     >     Talend - http://www.talend.com
>     >
> 
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by "devinduan (段丁瑞)" <de...@tencent.com>.
Hi,
    I test 300MB data file.
    Use command like:
    ./spark-submit --master yarn --deploy-mode client  --class com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g
   [cid:_Foxmail.1@85a39ec5-16ec-a295-8ef0-63945c7b2d4d]
[cid:_Foxmail.1@4fa5e77b-e9f7-eb0b-40ce-baf5f79b0b12]
 I set only one exeuctor. so task run in sequence . One task cost 10s.
[cid:_Foxmail.1@81d55412-d569-e1bd-2e42-8bc03756dccb]
However, Spark task cost only 0.4s
[cid:_Foxmail.1@51716cb7-dd68-ec6e-5ef2-88d11171326f]



From: Jean-Baptiste Onofré<ma...@nanthrax.net>
Date: 2018-09-19 12:22
To: dev@beam.apache.org<ma...@beam.apache.org>
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)

Hi,

did you compare the stages in the Spark UI in order to identify which
stage is taking time ?

You use spark-submit in both cases for the bootstrapping ?

I will do a test here as well.

Regards
JB

On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
> Hi,
>     Thanks for you reply.
>     Our team plan to use Beam instead of Spark, So I'm testing the
> performance of Beam API.
>     I'm coding some example through Spark API and Beam API , like
> "WordCount" , "Join",  "OrderBy",  "Union" ...
>     I use the same Resources and configuration to run these Job.
>    Tim said I should remove "withNumShards(1)" and
> set spark.default.parallelism=32. I did it and tried again, but Beam job
> still running very slowly.
>     Here is My Beam code and Spark code:
>    Beam "WordCount":
>
>    Spark "WordCount":
>
>    I will try the other example later.
>
> Regards
> devin
>
>
>     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     *Date:* 2018-09-18 22:43
>     *To:* dev@beam.apache.org <ma...@beam.apache.org>
>     *Subject:* Re: How to optimize the performance of Beam on
>     Spark(Internet mail)
>
>     Hi,
>
>     The first huge difference is the fact that the spark runner still uses
>     RDD whereas directly using spark, you are using dataset. A bunch of
>     optimization in spark are related to dataset.
>
>     I started a large refactoring of the spark runner to leverage Spark 2.x
>     (and dataset).
>     It's not yet ready as it includes other improvements (the portability
>     layer with Job API, a first check of state API, ...).
>
>     Anyway, by Spark wordcount, you mean the one included in the spark
>     distribution ?
>
>     Regards
>     JB
>
>     On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
>     > Hi,
>     >     I'm testing Beam on Spark.
>     >     I use spark example code WordCount processing 1G data file, cost 1
>     > minutes.
>     >     However, I use Beam example code WordCount processing the same
>     file,
>     > cost 30minutes.
>     >     My Spark parameter is :  --deploy-mode client
>      --executor-memory 1g
>     > --num-executors 1 --driver-memory 1g
>     >     My Spark version is 2.3.1,  Beam version is 2.5
>     >     Is there any optimization method?
>     > Thank you.
>     >
>     >
>
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi,

did you compare the stages in the Spark UI in order to identify which
stage is taking time ?

You use spark-submit in both cases for the bootstrapping ?

I will do a test here as well.

Regards
JB

On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
> Hi,
>     Thanks for you reply.
>     Our team plan to use Beam instead of Spark, So I'm testing the
> performance of Beam API.
>     I'm coding some example through Spark API and Beam API , like
> "WordCount" , "Join",  "OrderBy",  "Union" ...
>     I use the same Resources and configuration to run these Job.   
>    Tim said I should remove "withNumShards(1)" and
> set spark.default.parallelism=32. I did it and tried again, but Beam job
> still running very slowly.
>     Here is My Beam code and Spark code:
>    Beam "WordCount":
>     
>    Spark "WordCount":
> 
>    I will try the other example later.
>     
> Regards
> devin
> 
>      
>     *From:* Jean-Baptiste Onofré <ma...@nanthrax.net>
>     *Date:* 2018-09-18 22:43
>     *To:* dev@beam.apache.org <ma...@beam.apache.org>
>     *Subject:* Re: How to optimize the performance of Beam on
>     Spark(Internet mail)
> 
>     Hi,
> 
>     The first huge difference is the fact that the spark runner still uses
>     RDD whereas directly using spark, you are using dataset. A bunch of
>     optimization in spark are related to dataset.
> 
>     I started a large refactoring of the spark runner to leverage Spark 2.x
>     (and dataset).
>     It's not yet ready as it includes other improvements (the portability
>     layer with Job API, a first check of state API, ...).
> 
>     Anyway, by Spark wordcount, you mean the one included in the spark
>     distribution ?
> 
>     Regards
>     JB
> 
>     On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
>     > Hi,
>     >     I'm testing Beam on Spark. 
>     >     I use spark example code WordCount processing 1G data file, cost 1
>     > minutes.
>     >     However, I use Beam example code WordCount processing the same
>     file,
>     > cost 30minutes.
>     >     My Spark parameter is :  --deploy-mode client
>      --executor-memory 1g
>     > --num-executors 1 --driver-memory 1g
>     >     My Spark version is 2.3.1,  Beam version is 2.5
>     >     Is there any optimization method?
>     > Thank you.
>     >
>     >    
> 
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by "devinduan (段丁瑞)" <de...@tencent.com>.
Hi,
    Thanks for you reply.
    Our team plan to use Beam instead of Spark, So I'm testing the performance of Beam API.
    I'm coding some example through Spark API and Beam API , like "WordCount" , "Join",  "OrderBy",  "Union" ...
    I use the same Resources and configuration to run these Job.
   Tim said I should remove "withNumShards(1)" and set spark.default.parallelism=32. I did it and tried again, but Beam job still running very slowly.
[cid:_Foxmail.1@c2953f10-ca23-881f-0bdf-964126854c21]
    Here is My Beam code and Spark code:
   Beam "WordCount":
    [cid:_Foxmail.1@cd843846-9af3-4905-b906-5fb1f334ada0]
   Spark "WordCount":
[cid:_Foxmail.1@c2cda5b2-92f4-bb3e-7fa8-a36483202f38]

   I will try the other example later.

Regards
devin

From: Jean-Baptiste Onofré<ma...@nanthrax.net>
Date: 2018-09-18 22:43
To: dev@beam.apache.org<ma...@beam.apache.org>
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)

Hi,

The first huge difference is the fact that the spark runner still uses
RDD whereas directly using spark, you are using dataset. A bunch of
optimization in spark are related to dataset.

I started a large refactoring of the spark runner to leverage Spark 2.x
(and dataset).
It's not yet ready as it includes other improvements (the portability
layer with Job API, a first check of state API, ...).

Anyway, by Spark wordcount, you mean the one included in the spark
distribution ?

Regards
JB

On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> Hi,
>     I'm testing Beam on Spark.
>     I use spark example code WordCount processing 1G data file, cost 1
> minutes.
>     However, I use Beam example code WordCount processing the same file,
> cost 30minutes.
>     My Spark parameter is :  --deploy-mode client  --executor-memory 1g
> --num-executors 1 --driver-memory 1g
>     My Spark version is 2.3.1,  Beam version is 2.5
>     Is there any optimization method?
> Thank you.
>
>

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: How to optimize the performance of Beam on Spark

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi,

The first huge difference is the fact that the spark runner still uses
RDD whereas directly using spark, you are using dataset. A bunch of
optimization in spark are related to dataset.

I started a large refactoring of the spark runner to leverage Spark 2.x
(and dataset).
It's not yet ready as it includes other improvements (the portability
layer with Job API, a first check of state API, ...).

Anyway, by Spark wordcount, you mean the one included in the spark
distribution ?

Regards
JB

On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> Hi,
>     I'm testing Beam on Spark. 
>     I use spark example code WordCount processing 1G data file, cost 1
> minutes.
>     However, I use Beam example code WordCount processing the same file,
> cost 30minutes.
>     My Spark parameter is :  --deploy-mode client  --executor-memory 1g
> --num-executors 1 --driver-memory 1g
>     My Spark version is 2.3.1,  Beam version is 2.5
>     Is there any optimization method?
> Thank you.
> 
>    

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by Tim Robertson <ti...@gmail.com>.
Thanks for sharing all of that.

Can you please remove the "withNumShards(1)" to verify it performs the same
the wordcount?
It adds an addition groupByKey which you can read about on the JDoc [1],
and will also mean it runs without parallelism for the write.

You are running with a single executor which seems unusual for a spark
cluster. I would recommend running with several, and with the
spark.default.parallelism set.

The issue that Robert linked does apply in your scenario and I believe
having only a single shard output will make that issue worse (it copies all
data through a single "worker").

Thanks,
Tim

[1]
https://beam.apache.org/documentation/sdks/javadoc/2.3.0/org/apache/beam/sdk/io/WriteFiles.html

On Tue, Sep 18, 2018 at 11:21 AM devinduan(段丁瑞) <de...@tencent.com>
wrote:

> Hi Tim
>   Thanks for your reply.
>   I run beam example like :
>   ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g
> --jars /data/mapleleaf/tmp/beam.jar  /data/mapleleaf/tmp/beam.jar
>    BeamTest  code copy from WordCount.  I removed some unused code
> (Metric update).
>
>   I run spark example:
>  ./spark-submit --master yarn --deploy-mode client --class
> org.apache.spark.examples.JavaWordCount --executor-memory 1g
> --num-executors 1 --driver-memory 1g
> /data/mapleleaf/spark/examples/jars/spark-examples_2.11-2.3.0.jar
> hdfs://hdfsTest/test/test.txt
>
>  You can see the result...
>
>
> I'm trying to figure out reason...
> Spark job tasks number is 32, task duration only 0.4s
> Beam job task numbers also 32, but task duration cost nearly 1min.
> You can compare two jobs DAG:
> Spark:
>
> Beam :
>
> *From:* Tim Robertson <ti...@gmail.com>
> *Date:* 2018-09-18 16:55
> *To:* dev@beam.apache.org
> *Subject:* Re: How to optimize the performance of Beam on Spark(Internet
> mail)
> Hi devinduan
>
> The known issues Robert links there are actually HDFS related and not
> specific to Spark.  The improvement we're seeking is that the final copy of
> the output file can be optimised by using a "move" instead of "copy" andI
> expect to have it fixed for Beam 2.8.0.  On a small dataset like this
> though, I don't think it will impact performance too much.
>
> Can you please elaborate on your deployment?  It looks like you are using
> a cluster (i.e. deploy-mode client) but are you using HDFS?
>
> I have access to a Cloudera CDH 5.12 Hadoop cluster and just ran an
> example word count as follows - I'll explain the parameters to tune below:
>
> 1) I generate some random data (using common Hadoop tools)
> hadoop jar
> /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar \
>   teragen \
>   -Dmapred.map.tasks=100 \
>   -Dmapred.map.tasks.speculative.execution=false \
>   10000000  \
>   /tmp/tera
>
> This puts 100 files totalling just under 1GB on which I will run the word
> count. They are stored in the HDFS filesystem.
>
> 2) Run the word count using Spark (2.3.x) and Beam 2.5.0
>
> In my cluster I have YARN to allocate resources, and an HDFS filesystem.
> This will be different if you run Spark as standalone, or on a cloud
> environment.
>
> spark2-submit \
>   --conf spark.default.parallelism=45 \
>   --class org.apache.beam.runners.spark.examples.WordCount \
>   --master yarn \
>   --executor-memory 2G \
>   --executor-cores 5 \
>   --num-executors 9 \
>   --jars
> beam-sdks-java-core-2.5.0.jar,beam-runners-core-construction-java-2.5.0.jar,beam-runners-core-java-2.5.0.jar,beam-sdks-java-io-hadoop-file-system-2.5.0.jar
> \
>   beam-runners-spark-2.5.0.jar \
>   --runner=SparkRunner \
>   --inputFile=hdfs:///tmp/tera/* \
>   --output=hdfs:///tmp/wordcount
>
> The jars I provide here are the minimum needed for running on HDFS with
> Spark and normally you'd build those into your project as an über jar.
>
> The important bits for tuning for performance are the following - these
> will be applicable for any Spark deployment (unless embedded):
>
>   spark.default.parallelism - controls the parallelism of the beam
> pipeline. In this case, how many workers are tokenizing the input data.
>   executor-memory, executor-cores, num-executors - controls the resources
> spark will use
>
> Note, that the parallelism of 45 means that the 5 cores in the 9 executors
> can all run concurrently (i.e. 5x9 = 45). When you get to very large
> datasets, you will likely have parallelism much higher.
>
> In this test I see around 20 seconds initial startup of Spark (copying
> jars, requesting resources from YARN, establishing the Spark context) but
> once up the job completes in a few seconds writing the output into 45 files
> (because of the parallelism). The files are named
> /tmp/wordcount-000*-of-00045.
>
> I hope this helps provide a few pointers, but if you elaborate on your
> environment we might be able to assist more.
>
> Best wishes,
> Tim
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Sep 18, 2018 at 9:29 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> There are known performance issues with Beam on Spark that are being
>> worked on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's
>> possible you're hitting something different, but would be worth
>> investigating. See also
>> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write
>>
>> On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) <de...@tencent.com>
>> wrote:
>>
>>> Hi,
>>>     I'm testing Beam on Spark.
>>>     I use spark example code WordCount processing 1G data file, cost 1
>>> minutes.
>>>     However, I use Beam example code WordCount processing the same file,
>>> cost 30minutes.
>>>     My Spark parameter is :  --deploy-mode client  --executor-memory 1g
>>> --num-executors 1 --driver-memory 1g
>>>     My Spark version is 2.3.1,  Beam version is 2.5
>>>     Is there any optimization method?
>>> Thank you.
>>>
>>>
>>>
>>

Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by "devinduan (段丁瑞)" <de...@tencent.com>.
Hi Tim
  Thanks for your reply.
  I run beam example like :
  ./spark-submit --master yarn --deploy-mode client  --class com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g --jars /data/mapleleaf/tmp/beam.jar  /data/mapleleaf/tmp/beam.jar
   BeamTest  code copy from WordCount.  I removed some unused code (Metric update).
 [cid:_Foxmail.1@8926cecf-517d-2a20-918b-dc3b5d4866be]
  I run spark example:
 ./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.JavaWordCount --executor-memory 1g --num-executors 1 --driver-memory 1g /data/mapleleaf/spark/examples/jars/spark-examples_2.11-2.3.0.jar hdfs://hdfsTest/test/test.txt

 You can see the result...
[cid:_Foxmail.1@33461416-fe6f-8dfb-2da2-243a9a0ed7aa]

I'm trying to figure out reason...
Spark job tasks number is 32, task duration only 0.4s
[cid:_Foxmail.1@a88708e0-648e-88ca-dfbd-5cce7c4d1065]
Beam job task numbers also 32, but task duration cost nearly 1min.
[cid:_Foxmail.1@fcdaa7f2-7eac-f710-fd0c-796ad4c5bfc7]
You can compare two jobs DAG:
Spark:
[cid:_Foxmail.1@3d0bd3ca-d1b4-6bb4-1c04-8225a3bb3ac0]
Beam :
[cid:_Foxmail.1@3e9ef541-d7d8-051f-6419-84ff2c0e8857]

From: Tim Robertson<ma...@gmail.com>
Date: 2018-09-18 16:55
To: dev@beam.apache.org<ma...@beam.apache.org>
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)
Hi devinduan

The known issues Robert links there are actually HDFS related and not specific to Spark.  The improvement we're seeking is that the final copy of the output file can be optimised by using a "move" instead of "copy" andI expect to have it fixed for Beam 2.8.0.  On a small dataset like this though, I don't think it will impact performance too much.

Can you please elaborate on your deployment?  It looks like you are using a cluster (i.e. deploy-mode client) but are you using HDFS?

I have access to a Cloudera CDH 5.12 Hadoop cluster and just ran an example word count as follows - I'll explain the parameters to tune below:

1) I generate some random data (using common Hadoop tools)
hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar \
  teragen \
  -Dmapred.map.tasks=100 \
  -Dmapred.map.tasks.speculative.execution=false \
  10000000  \
  /tmp/tera

This puts 100 files totalling just under 1GB on which I will run the word count. They are stored in the HDFS filesystem.

2) Run the word count using Spark (2.3.x) and Beam 2.5.0

In my cluster I have YARN to allocate resources, and an HDFS filesystem. This will be different if you run Spark as standalone, or on a cloud environment.

spark2-submit \
  --conf spark.default.parallelism=45 \
  --class org.apache.beam.runners.spark.examples.WordCount \
  --master yarn \
  --executor-memory 2G \
  --executor-cores 5 \
  --num-executors 9 \
  --jars beam-sdks-java-core-2.5.0.jar,beam-runners-core-construction-java-2.5.0.jar,beam-runners-core-java-2.5.0.jar,beam-sdks-java-io-hadoop-file-system-2.5.0.jar \
  beam-runners-spark-2.5.0.jar \
  --runner=SparkRunner \
  --inputFile=hdfs:///tmp/tera/* \
  --output=hdfs:///tmp/wordcount

The jars I provide here are the minimum needed for running on HDFS with Spark and normally you'd build those into your project as an über jar.

The important bits for tuning for performance are the following - these will be applicable for any Spark deployment (unless embedded):

  spark.default.parallelism - controls the parallelism of the beam pipeline. In this case, how many workers are tokenizing the input data.
  executor-memory, executor-cores, num-executors - controls the resources spark will use

Note, that the parallelism of 45 means that the 5 cores in the 9 executors can all run concurrently (i.e. 5x9 = 45). When you get to very large datasets, you will likely have parallelism much higher.

In this test I see around 20 seconds initial startup of Spark (copying jars, requesting resources from YARN, establishing the Spark context) but once up the job completes in a few seconds writing the output into 45 files (because of the parallelism). The files are named /tmp/wordcount-000*-of-00045.

I hope this helps provide a few pointers, but if you elaborate on your environment we might be able to assist more.

Best wishes,
Tim













On Tue, Sep 18, 2018 at 9:29 AM Robert Bradshaw <ro...@google.com>> wrote:
There are known performance issues with Beam on Spark that are being worked on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's possible you're hitting something different, but would be worth investigating. See also https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write

On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) <de...@tencent.com>> wrote:
Hi,
    I'm testing Beam on Spark.
    I use spark example code WordCount processing 1G data file, cost 1 minutes.
    However, I use Beam example code WordCount processing the same file, cost 30minutes.
    My Spark parameter is :  --deploy-mode client  --executor-memory 1g --num-executors 1 --driver-memory 1g
    My Spark version is 2.3.1,  Beam version is 2.5
    Is there any optimization method?
Thank you.



Re: How to optimize the performance of Beam on Spark

Posted by Tim Robertson <ti...@gmail.com>.
Hi devinduan

The known issues Robert links there are actually HDFS related and not
specific to Spark.  The improvement we're seeking is that the final copy of
the output file can be optimised by using a "move" instead of "copy" andI
expect to have it fixed for Beam 2.8.0.  On a small dataset like this
though, I don't think it will impact performance too much.

Can you please elaborate on your deployment?  It looks like you are using a
cluster (i.e. deploy-mode client) but are you using HDFS?

I have access to a Cloudera CDH 5.12 Hadoop cluster and just ran an example
word count as follows - I'll explain the parameters to tune below:

1) I generate some random data (using common Hadoop tools)
hadoop jar
/opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar \
  teragen \
  -Dmapred.map.tasks=100 \
  -Dmapred.map.tasks.speculative.execution=false \
  10000000  \
  /tmp/tera

This puts 100 files totalling just under 1GB on which I will run the word
count. They are stored in the HDFS filesystem.

2) Run the word count using Spark (2.3.x) and Beam 2.5.0

In my cluster I have YARN to allocate resources, and an HDFS filesystem.
This will be different if you run Spark as standalone, or on a cloud
environment.

spark2-submit \
  --conf spark.default.parallelism=45 \
  --class org.apache.beam.runners.spark.examples.WordCount \
  --master yarn \
  --executor-memory 2G \
  --executor-cores 5 \
  --num-executors 9 \
  --jars
beam-sdks-java-core-2.5.0.jar,beam-runners-core-construction-java-2.5.0.jar,beam-runners-core-java-2.5.0.jar,beam-sdks-java-io-hadoop-file-system-2.5.0.jar
\
  beam-runners-spark-2.5.0.jar \
  --runner=SparkRunner \
  --inputFile=hdfs:///tmp/tera/* \
  --output=hdfs:///tmp/wordcount

The jars I provide here are the minimum needed for running on HDFS with
Spark and normally you'd build those into your project as an über jar.

The important bits for tuning for performance are the following - these
will be applicable for any Spark deployment (unless embedded):

  spark.default.parallelism - controls the parallelism of the beam
pipeline. In this case, how many workers are tokenizing the input data.
  executor-memory, executor-cores, num-executors - controls the resources
spark will use

Note, that the parallelism of 45 means that the 5 cores in the 9 executors
can all run concurrently (i.e. 5x9 = 45). When you get to very large
datasets, you will likely have parallelism much higher.

In this test I see around 20 seconds initial startup of Spark (copying
jars, requesting resources from YARN, establishing the Spark context) but
once up the job completes in a few seconds writing the output into 45 files
(because of the parallelism). The files are named
/tmp/wordcount-000*-of-00045.

I hope this helps provide a few pointers, but if you elaborate on your
environment we might be able to assist more.

Best wishes,
Tim













On Tue, Sep 18, 2018 at 9:29 AM Robert Bradshaw <ro...@google.com> wrote:

> There are known performance issues with Beam on Spark that are being
> worked on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's
> possible you're hitting something different, but would be worth
> investigating. See also
> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write
>
> On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) <de...@tencent.com>
> wrote:
>
>> Hi,
>>     I'm testing Beam on Spark.
>>     I use spark example code WordCount processing 1G data file, cost 1
>> minutes.
>>     However, I use Beam example code WordCount processing the same file,
>> cost 30minutes.
>>     My Spark parameter is :  --deploy-mode client  --executor-memory 1g
>> --num-executors 1 --driver-memory 1g
>>     My Spark version is 2.3.1,  Beam version is 2.5
>>     Is there any optimization method?
>> Thank you.
>>
>>
>>
>

Re: How to optimize the performance of Beam on Spark

Posted by Robert Bradshaw <ro...@google.com>.
There are known performance issues with Beam on Spark that are being worked
on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's possible
you're hitting something different, but would be worth investigating. See
also
https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write

On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) <de...@tencent.com>
wrote:

> Hi,
>     I'm testing Beam on Spark.
>     I use spark example code WordCount processing 1G data file, cost 1
> minutes.
>     However, I use Beam example code WordCount processing the same file,
> cost 30minutes.
>     My Spark parameter is :  --deploy-mode client  --executor-memory 1g
> --num-executors 1 --driver-memory 1g
>     My Spark version is 2.3.1,  Beam version is 2.5
>     Is there any optimization method?
> Thank you.
>
>
>