You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "devinduan (段丁瑞)" <de...@tencent.com> on 2018/09/18 09:20:50 UTC

Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Hi Tim
  Thanks for your reply.
  I run beam example like :
  ./spark-submit --master yarn --deploy-mode client  --class com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g --jars /data/mapleleaf/tmp/beam.jar  /data/mapleleaf/tmp/beam.jar
   BeamTest  code copy from WordCount.  I removed some unused code (Metric update).
 [cid:_Foxmail.1@8926cecf-517d-2a20-918b-dc3b5d4866be]
  I run spark example:
 ./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.JavaWordCount --executor-memory 1g --num-executors 1 --driver-memory 1g /data/mapleleaf/spark/examples/jars/spark-examples_2.11-2.3.0.jar hdfs://hdfsTest/test/test.txt

 You can see the result...
[cid:_Foxmail.1@33461416-fe6f-8dfb-2da2-243a9a0ed7aa]

I'm trying to figure out reason...
Spark job tasks number is 32, task duration only 0.4s
[cid:_Foxmail.1@a88708e0-648e-88ca-dfbd-5cce7c4d1065]
Beam job task numbers also 32, but task duration cost nearly 1min.
[cid:_Foxmail.1@fcdaa7f2-7eac-f710-fd0c-796ad4c5bfc7]
You can compare two jobs DAG:
Spark:
[cid:_Foxmail.1@3d0bd3ca-d1b4-6bb4-1c04-8225a3bb3ac0]
Beam :
[cid:_Foxmail.1@3e9ef541-d7d8-051f-6419-84ff2c0e8857]

From: Tim Robertson<ma...@gmail.com>
Date: 2018-09-18 16:55
To: dev@beam.apache.org<ma...@beam.apache.org>
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)
Hi devinduan

The known issues Robert links there are actually HDFS related and not specific to Spark.  The improvement we're seeking is that the final copy of the output file can be optimised by using a "move" instead of "copy" andI expect to have it fixed for Beam 2.8.0.  On a small dataset like this though, I don't think it will impact performance too much.

Can you please elaborate on your deployment?  It looks like you are using a cluster (i.e. deploy-mode client) but are you using HDFS?

I have access to a Cloudera CDH 5.12 Hadoop cluster and just ran an example word count as follows - I'll explain the parameters to tune below:

1) I generate some random data (using common Hadoop tools)
hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar \
  teragen \
  -Dmapred.map.tasks=100 \
  -Dmapred.map.tasks.speculative.execution=false \
  10000000  \
  /tmp/tera

This puts 100 files totalling just under 1GB on which I will run the word count. They are stored in the HDFS filesystem.

2) Run the word count using Spark (2.3.x) and Beam 2.5.0

In my cluster I have YARN to allocate resources, and an HDFS filesystem. This will be different if you run Spark as standalone, or on a cloud environment.

spark2-submit \
  --conf spark.default.parallelism=45 \
  --class org.apache.beam.runners.spark.examples.WordCount \
  --master yarn \
  --executor-memory 2G \
  --executor-cores 5 \
  --num-executors 9 \
  --jars beam-sdks-java-core-2.5.0.jar,beam-runners-core-construction-java-2.5.0.jar,beam-runners-core-java-2.5.0.jar,beam-sdks-java-io-hadoop-file-system-2.5.0.jar \
  beam-runners-spark-2.5.0.jar \
  --runner=SparkRunner \
  --inputFile=hdfs:///tmp/tera/* \
  --output=hdfs:///tmp/wordcount

The jars I provide here are the minimum needed for running on HDFS with Spark and normally you'd build those into your project as an über jar.

The important bits for tuning for performance are the following - these will be applicable for any Spark deployment (unless embedded):

  spark.default.parallelism - controls the parallelism of the beam pipeline. In this case, how many workers are tokenizing the input data.
  executor-memory, executor-cores, num-executors - controls the resources spark will use

Note, that the parallelism of 45 means that the 5 cores in the 9 executors can all run concurrently (i.e. 5x9 = 45). When you get to very large datasets, you will likely have parallelism much higher.

In this test I see around 20 seconds initial startup of Spark (copying jars, requesting resources from YARN, establishing the Spark context) but once up the job completes in a few seconds writing the output into 45 files (because of the parallelism). The files are named /tmp/wordcount-000*-of-00045.

I hope this helps provide a few pointers, but if you elaborate on your environment we might be able to assist more.

Best wishes,
Tim













On Tue, Sep 18, 2018 at 9:29 AM Robert Bradshaw <ro...@google.com>> wrote:
There are known performance issues with Beam on Spark that are being worked on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's possible you're hitting something different, but would be worth investigating. See also https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write

On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) <de...@tencent.com>> wrote:
Hi,
    I'm testing Beam on Spark.
    I use spark example code WordCount processing 1G data file, cost 1 minutes.
    However, I use Beam example code WordCount processing the same file, cost 30minutes.
    My Spark parameter is :  --deploy-mode client  --executor-memory 1g --num-executors 1 --driver-memory 1g
    My Spark version is 2.3.1,  Beam version is 2.5
    Is there any optimization method?
Thank you.



Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

Posted by Tim Robertson <ti...@gmail.com>.
Thanks for sharing all of that.

Can you please remove the "withNumShards(1)" to verify it performs the same
the wordcount?
It adds an addition groupByKey which you can read about on the JDoc [1],
and will also mean it runs without parallelism for the write.

You are running with a single executor which seems unusual for a spark
cluster. I would recommend running with several, and with the
spark.default.parallelism set.

The issue that Robert linked does apply in your scenario and I believe
having only a single shard output will make that issue worse (it copies all
data through a single "worker").

Thanks,
Tim

[1]
https://beam.apache.org/documentation/sdks/javadoc/2.3.0/org/apache/beam/sdk/io/WriteFiles.html

On Tue, Sep 18, 2018 at 11:21 AM devinduan(段丁瑞) <de...@tencent.com>
wrote:

> Hi Tim
>   Thanks for your reply.
>   I run beam example like :
>   ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g
> --jars /data/mapleleaf/tmp/beam.jar  /data/mapleleaf/tmp/beam.jar
>    BeamTest  code copy from WordCount.  I removed some unused code
> (Metric update).
>
>   I run spark example:
>  ./spark-submit --master yarn --deploy-mode client --class
> org.apache.spark.examples.JavaWordCount --executor-memory 1g
> --num-executors 1 --driver-memory 1g
> /data/mapleleaf/spark/examples/jars/spark-examples_2.11-2.3.0.jar
> hdfs://hdfsTest/test/test.txt
>
>  You can see the result...
>
>
> I'm trying to figure out reason...
> Spark job tasks number is 32, task duration only 0.4s
> Beam job task numbers also 32, but task duration cost nearly 1min.
> You can compare two jobs DAG:
> Spark:
>
> Beam :
>
> *From:* Tim Robertson <ti...@gmail.com>
> *Date:* 2018-09-18 16:55
> *To:* dev@beam.apache.org
> *Subject:* Re: How to optimize the performance of Beam on Spark(Internet
> mail)
> Hi devinduan
>
> The known issues Robert links there are actually HDFS related and not
> specific to Spark.  The improvement we're seeking is that the final copy of
> the output file can be optimised by using a "move" instead of "copy" andI
> expect to have it fixed for Beam 2.8.0.  On a small dataset like this
> though, I don't think it will impact performance too much.
>
> Can you please elaborate on your deployment?  It looks like you are using
> a cluster (i.e. deploy-mode client) but are you using HDFS?
>
> I have access to a Cloudera CDH 5.12 Hadoop cluster and just ran an
> example word count as follows - I'll explain the parameters to tune below:
>
> 1) I generate some random data (using common Hadoop tools)
> hadoop jar
> /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar \
>   teragen \
>   -Dmapred.map.tasks=100 \
>   -Dmapred.map.tasks.speculative.execution=false \
>   10000000  \
>   /tmp/tera
>
> This puts 100 files totalling just under 1GB on which I will run the word
> count. They are stored in the HDFS filesystem.
>
> 2) Run the word count using Spark (2.3.x) and Beam 2.5.0
>
> In my cluster I have YARN to allocate resources, and an HDFS filesystem.
> This will be different if you run Spark as standalone, or on a cloud
> environment.
>
> spark2-submit \
>   --conf spark.default.parallelism=45 \
>   --class org.apache.beam.runners.spark.examples.WordCount \
>   --master yarn \
>   --executor-memory 2G \
>   --executor-cores 5 \
>   --num-executors 9 \
>   --jars
> beam-sdks-java-core-2.5.0.jar,beam-runners-core-construction-java-2.5.0.jar,beam-runners-core-java-2.5.0.jar,beam-sdks-java-io-hadoop-file-system-2.5.0.jar
> \
>   beam-runners-spark-2.5.0.jar \
>   --runner=SparkRunner \
>   --inputFile=hdfs:///tmp/tera/* \
>   --output=hdfs:///tmp/wordcount
>
> The jars I provide here are the minimum needed for running on HDFS with
> Spark and normally you'd build those into your project as an über jar.
>
> The important bits for tuning for performance are the following - these
> will be applicable for any Spark deployment (unless embedded):
>
>   spark.default.parallelism - controls the parallelism of the beam
> pipeline. In this case, how many workers are tokenizing the input data.
>   executor-memory, executor-cores, num-executors - controls the resources
> spark will use
>
> Note, that the parallelism of 45 means that the 5 cores in the 9 executors
> can all run concurrently (i.e. 5x9 = 45). When you get to very large
> datasets, you will likely have parallelism much higher.
>
> In this test I see around 20 seconds initial startup of Spark (copying
> jars, requesting resources from YARN, establishing the Spark context) but
> once up the job completes in a few seconds writing the output into 45 files
> (because of the parallelism). The files are named
> /tmp/wordcount-000*-of-00045.
>
> I hope this helps provide a few pointers, but if you elaborate on your
> environment we might be able to assist more.
>
> Best wishes,
> Tim
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Sep 18, 2018 at 9:29 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> There are known performance issues with Beam on Spark that are being
>> worked on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's
>> possible you're hitting something different, but would be worth
>> investigating. See also
>> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write
>>
>> On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) <de...@tencent.com>
>> wrote:
>>
>>> Hi,
>>>     I'm testing Beam on Spark.
>>>     I use spark example code WordCount processing 1G data file, cost 1
>>> minutes.
>>>     However, I use Beam example code WordCount processing the same file,
>>> cost 30minutes.
>>>     My Spark parameter is :  --deploy-mode client  --executor-memory 1g
>>> --num-executors 1 --driver-memory 1g
>>>     My Spark version is 2.3.1,  Beam version is 2.5
>>>     Is there any optimization method?
>>> Thank you.
>>>
>>>
>>>
>>