You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mike Kaplinskiy <mi...@ladderlife.com> on 2021/09/07 21:13:45 UTC

Re: Perf issue with Beam on spark (spark runner)

A long time ago when I was experimenting with the Spark runner for a batch
job, I noticed that a lot of time was spend in GC as well. In my case I
narrowed it down to how the Spark runner implements Coders.

Spark's value prop is that it only serializes data when it truly has no
other choice - i.e. when it needs to reclaim memory or when it sends things
over the wire. Unfortunately due to the mismatch in serialization APIs
between Beam and Spark, Beam's Spark runner actually just serializes things
all the time. My theory was that the to/from byte array dance was slow. I
attempted to fix this at https://github.com/apache/beam/pull/8371 but I
could never actually reproduce a speedup in performance benchmarks.

If you're feeling up to it, you could try reviving something like that PR
and see if it helps.

Mike.

Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life.


On Sat, Aug 14, 2021 at 4:35 PM Tao Li <ta...@zillow.com> wrote:

> @Alexey Romanenko <ar...@gmail.com> I tried out ParquetIO
> splittable and the processing time improved from 10 min to 6 min, but still
> much longer than 2 min using a native spark app.
>
>
>
> We are still seeing a lot of GC cost from below call stack. Do you think
> this ticket can fix this issue
> https://issues.apache.org/jira/browse/BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335838570%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=E8x8W8j2VPlrxAYbxyGq4nyluKxdwXj%2FF6fCquS2H%2F8%3D&reserved=0>
> ? Thanks.
>
>
>
> [image: Graphical user interface, text, application Description
> automatically generated]
>
>
>
>
>
>
>
> *From: *Tao Li <ta...@zillow.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, August 6, 2021 at 11:12 AM
> *To: *Alexey Romanenko <ar...@gmail.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <
> apilloud@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> Thanks @Alexey Romanenko <ar...@gmail.com> please see my
> clarifications below.
>
>
>
>
>
> | “Well, of course, if you read all fields (columns) then you don’t need
> column projection. Otherwise, it can give a quite significant performance
> boost, especially for large tables with many columns. “
>
>
>
> [Tao] Basically my perf testing was comparing beam spark runner and native
> spark. In both the beam app and the native spark app, I was simply reading
> a parquet backed dataset and immediately saving it back to parquet. And we
> were seeing the beam app took 3-5 times longer than native spark. As I have
> shared in this thread previously, below call stack from spark runner was
> quite time consuming..
>
>
>
> [image: Graphical user interface, text, application Description
> automatically generated]
>
>
>
>
>
>
>
> | *"Legacy Read transform (non-SDF based Read) is used by default for
> non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to
> re-enable SDF based Read transforms
> ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335818653%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=DiRRConV45GaxYNTcUz5W49FAGZyMHTmb3dYlwYdlR8%3D&reserved=0>))”*
>
>
>
> [Tao] We are not specifying `use_sdf_read` experimental flag in our beam
> app, so we are not using SDF translation.
>
>
>
>
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Date: *Friday, August 6, 2021 at 8:13 AM
> *To: *Tao Li <ta...@zillow.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <
> apilloud@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
>
>
>
>
> On 5 Aug 2021, at 18:17, Tao Li <ta...@zillow.com> wrote:
>
>
>
> It was a great presentation!
>
>
>
> Thanks!
>
>
>
>  Regarding my perf testing, I was not doing aggregation, filtering,
> projection or joining. I was simply reading all the fields of parquet and
> then immediately save PCollection back to parquet.
>
>
>
> Well, of course, if you read all fields (columns) then you don’t need
> column projection. Otherwise, it can give a quite significant performance
> boost, especially for large tables with many columns.
>
>
>
>
>
> Regarding SDF translation, is it enabled by default?
>
>
>
> From Beam 2.30.0 release notes:
>
>
>
> *"Legacy Read transform (non-SDF based Read) is used by default for
> non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to
> re-enable SDF based Read transforms
> ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335818653%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=DiRRConV45GaxYNTcUz5W49FAGZyMHTmb3dYlwYdlR8%3D&reserved=0>))”*
>
>
>
> *—*
>
> Alexey
>
>
>
>  I will check out ParquetIO splittable. Thanks!
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Date: *Thursday, August 5, 2021 at 6:40 AM
> *To: *Tao Li <ta...@zillow.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <
> apilloud@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> It’s very likely that Spark SQL may have much better performance because
> of SQL push-downs and avoiding additional ser/deser operations.
>
>
>
> In the same time, did you try to leverage "withProjection()” in ParquetIO
> and project only the fields that you needed?
>
>
>
> Did you use ParquetIO splittable (it's not enabled by default, fixed in
> [1])?
>
>
>
> Also, using SDF translation for Read on Spark Runner can cause performance
> degradation as well (we noticed that in our experiments). Try to use
> non-SDF read (if not yet) [2]
>
>
>
>
>
> PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m
> not sure if a recording is already available but you can find the slides
> here [3] that can be helpful.
>
>
>
>
>
> —
>
> Alexey
>
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-12070
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335828610%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=qlcd97Dqhe7Lq6CTRPsw5B6ErsMhV3qg%2FmNoMZOQNns%3D&reserved=0>
>
> [2] https://issues.apache.org/jira/browse/BEAM-10670
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335828610%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=OuIWjjk6aIiEI9vmdlhE8n3s5nNa6Mb5t1SRsGiO9Zk%3D&reserved=0>
>
> [3]
> https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335838570%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=K3FOAtsmsVuZykYomI4PdXxMvH9S4tNLk1yBZuIVaf8%3D&reserved=0>
>
>
>
>
>
> On 5 Aug 2021, at 03:07, Tao Li <ta...@zillow.com> wrote:
>
>
>
> @Alexey Romanenko <ar...@gmail.com> @Ismaël Mejía
> <ie...@gmail.com> I assume you are experts on spark runner. Can you
> please take a look at this thread and confirm this jira covers the causes
> https://issues.apache.org/jira/browse/BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335838570%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=E8x8W8j2VPlrxAYbxyGq4nyluKxdwXj%2FF6fCquS2H%2F8%3D&reserved=0>
>  ?
>
>
>
> This perf issue is currently a blocker to me..
>
>
>
> Thanks so much!
>
>
>
> *From: *Tao Li <ta...@zillow.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, July 30, 2021 at 3:53 PM
> *To: *Andrew Pilloud <ap...@google.com>, "user@beam.apache.org" <
> user@beam.apache.org>
> *Cc: *Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> Thanks everyone for your help.
>
>
>
> We actually did another round of perf comparison between Beam (on spark)
> and native spark, without any projection/filtering in the query (to rule
> out the “predicate pushdown” factor).
>
>
>
> The time spent on Beam with spark runner is still taking 3-5x period of
> time compared with native spark, and the cause is
> https://issues.apache.org/jira/browse/BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335848522%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=KGp1hHMjySiLCU5Mvdxqc%2BJRtwMMmSSc%2Fv7msTVwkNQ%3D&reserved=0>
>  according to the spark metrics. Spark runner is pretty much the
> bottleneck.
>
>
>
> <image001.png>
>
>
>
> *From: *Andrew Pilloud <ap...@google.com>
> *Date: *Thursday, July 29, 2021 at 2:11 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Cc: *Tao Li <ta...@zillow.com>, Kyle Weaver <kc...@google.com>, Yuchu
> Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.
>
>
>
> Andrew
>
>
>
> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <ap...@google.com>
> wrote:
>
> Beam SQL doesn't currently have project pushdown for ParquetIO (we are
> working to expand this to more IOs). Using ParquetIO withProjection
> directly will produce better results.
>
>
>
> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
> Could you try using Beam SQL [1] and see if that gives more similar result
> to your Spark SQL query? I would also be curious if the performance is
> sufficient using withProjection to only read the auction, price, and bidder
> columns.
>
>
>
> [1] https://beam.apache.org/documentation/dsls/sql/overview/
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335848522%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=eIOxp%2BJLsf0vmV%2FUHfGGbhrxplgv9gzKtUppv%2Fu5nM4%3D&reserved=0>
>
> [2]
> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema-
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335858480%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=C0GOHzjxx%2BCEWPrk0kAx%2BVKDE3DXhE8tXsDg6m1CYJA%3D&reserved=0>
>
>
>
> On Sat, Jul 24, 2021 at 10:23 AM Tao Li <ta...@zillow.com> wrote:
>
> Thanks Robert for filing BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335858480%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=SuKU61x96LkWs5cjL8xgQ9qo5SxFP0ocMljM3Zx66HM%3D&reserved=0>.
> This perf issue is a blocker for us to adopt Beam. It would be great if the
> community could conclude the root cause and share an ETA for the fix.
> Thanks so much!
>
>
>
>
>
> *From: *Robert Bradshaw <ro...@google.com>
> *Date: *Wednesday, July 21, 2021 at 3:51 PM
> *To: *Tao Li <ta...@zillow.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> On Wed, Jul 21, 2021 at 3:00 PM Tao Li <ta...@zillow.com> wrote:
>
> @Robert Bradshaw <ro...@google.com> with Spark API, the code is
> actually much simple. We are just calling spark SQL API against a hive
> table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder  FROM bid”)
>
>
>
> Good chance that this is pushing projection of those few fields up into
> the read operator, which could be a dramatic savings. You could try doing
> it manually in Beam, or use Beam's SQL that should do the same.
>
>
>
>
>
> I think the “globally windowed GBK” optimization you are proposing is a
> good callout.
>
>
>
> Filed https://issues.apache.org/jira/browse/BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7C27c93a5f264744e7300808d958eccb9d%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637638596335868433%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=CJAl%2FJjDx56zydQKt8RWtQDv9jWzAqTY1nvV0UbPJs0%3D&reserved=0>
>  to track.
>
>
>
>
>
> *From: *Robert Bradshaw <ro...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Wednesday, July 21, 2021 at 1:09 PM
> *To: *user <us...@beam.apache.org>
> *Cc: *Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> On Wed, Jul 21, 2021 at 12:51 PM Tao Li <ta...@zillow.com> wrote:
>
> Kyle, I don’t expect such a huge perf diff as well. To your question, no I
> am not specifying withProjection or withSplit for parquet reader.
>
>
>
> Are you doing so in your Spark code?
>
>
>
> Below is my parquet read code:
>
>
>
> PCollection<FileIO.ReadableFile> files = pipeline
>
>                 .apply(FileIO.match().filepattern(beamRequiredPath))
>
>                 .apply(FileIO.readMatches());
>
>
>
> PCollection<Row> table = files
>
>                 .apply(ParquetIO
>
>                         .readFiles(avroSchema)
>
>
> .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
> "false")))
>
>                 .apply(MapElements
>
>                         .into(TypeDescriptors.rows())
>
>
> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema))))
>
>                 .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
>
>
>
>
>
> According to my investigation, looks like below call stack is very
> computation intensive and causing a lot of GC time. And looks like the
> stack comes from spark runner code.
>
>
>
> This does look inordinately expensive. I wonder if it would make sense to
> optimize the globally windowed GBK as some other runners do.
>
>
>
>
>
> <image001.png>
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Date: *Tuesday, July 20, 2021 at 3:57 PM
> *To: *Tao Li <ta...@zillow.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Yuchu Cao <
> yuchuc@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> Beam has its own implementation of Parquet IO, and doesn't use Spark's.
> It's possible Spark's implementation does more optimizations, though
> perhaps not enough to result in such a dramatic difference.
>
>
>
> I'm curious how your Parquet read is configured. In particular,
> if withProjection or withSplit are set.
>
>
>
> On Tue, Jul 20, 2021 at 3:21 PM Tao Li <ta...@zillow.com> wrote:
>
> Hi Kyle,
>
>
>
> The ParDo (which references the code I shared) is the only transformation
> in my pipeline. The input and output are parquet files in S3 (we are using
> beam ParquetIO).
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Tuesday, July 20, 2021 at 2:13 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Cc: *Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> The DoFn you shared is simple enough that it seems unlikely to be the
> performance bottleneck here.
>
>
>
> Can you share more information about your complete pipeline? What other
> transforms are there? What sources/sinks are you using?
>
>
>
> On Tue, Jul 20, 2021 at 2:02 PM Tao Li <ta...@zillow.com> wrote:
>
> Hi Beam community,
>
>
>
> We are seeing a serious perf issue with beam using spark runner, compared
> with writing a native spark app. Can you please provide some help?
>
>
>
> The beam on spark app is taking 8-10 min, whereas a native spark is only
> taking 2 min. Below is Spark UI, from which you can see the flatMapToPair
> method is very time consuming. Is this method call coming from spark
> runner?
>
>
>
> <image001.png>
>
>
>
> I suspect this is caused by high GC time. See “GC Time” column below:
>
>
>
> <image002.png>
>
>
>
>
>
> The beam code is really simple, just a per row processing.
>
>
>
> public class CalcFn extends DoFn<Row, Row> {
>
>     protected Logger log = LoggerFactory.getLogger(this.getClass());
>
>     private Schema schema;
>
>
>
>     public CalcFn(Schema schema) {
>
>         this.schema = schema;
>
>
>
>
>
>
>
>     }
>
>
>
>     @ProcessElement
>
>     public void processElement(@Element Row row,OutputReceiver<Row>
> receiver) {
>
>        // Row row = ctx.element();
>
>         Long auction_value = (Long) row.getBaseValue("auction");
>
>         Long bid_value = (Long) row.getBaseValue("bidder");
>
>         Long price = (Long) row.getBaseValue("price");
>
>         Double euro = price * 0.82;
>
>
>
>         receiver.output( Row.withSchema(schema)
>
>                 .addValues(auction_value, euro, bid_value).build());
>
>     }
>
> }
>
>
>

Re: Perf issue with Beam on spark (spark runner)

Posted by Robert Bradshaw <ro...@google.com>.
No, I have not had a chance to benchmark this yet.

On Tue, Oct 12, 2021 at 9:00 AM Alexey Romanenko
<ar...@gmail.com> wrote:
>
> Robert,
>
> Do you have any numbers by chance regarding this optimisation?
>
> Alexey
>
> On 5 Oct 2021, at 00:27, Robert Bradshaw <ro...@google.com> wrote:
>
> https://github.com/apache/beam/pull/15637 might help some.
>
> On Thu, Sep 9, 2021 at 5:21 PM Tao Li <ta...@zillow.com> wrote:
>>
>> Thanks Mike for this info!
>>
>>
>>
>> From: Mike Kaplinskiy <mi...@ladderlife.com>
>> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
>> Date: Tuesday, September 7, 2021 at 2:15 PM
>> To: "user@beam.apache.org" <us...@beam.apache.org>
>> Cc: Alexey Romanenko <ar...@gmail.com>, Andrew Pilloud <ap...@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> A long time ago when I was experimenting with the Spark runner for a batch job, I noticed that a lot of time was spend in GC as well. In my case I narrowed it down to how the Spark runner implements Coders.
>>
>>
>>
>> Spark's value prop is that it only serializes data when it truly has no other choice - i.e. when it needs to reclaim memory or when it sends things over the wire. Unfortunately due to the mismatch in serialization APIs between Beam and Spark, Beam's Spark runner actually just serializes things all the time. My theory was that the to/from byte array dance was slow. I attempted to fix this at https://github.com/apache/beam/pull/8371 but I could never actually reproduce a speedup in performance benchmarks.
>>
>>
>>
>> If you're feeling up to it, you could try reviving something like that PR and see if it helps.
>>
>>
>>
>> Mike.
>>
>> Ladder. The smart, modern way to insure your life.
>>
>>
>>
>>
>>
>> On Sat, Aug 14, 2021 at 4:35 PM Tao Li <ta...@zillow.com> wrote:
>>
>> @Alexey Romanenko I tried out ParquetIO splittable and the processing time improved from 10 min to 6 min, but still much longer than 2 min using a native spark app.
>>
>>
>>
>> We are still seeing a lot of GC cost from below call stack. Do you think this ticket can fix this issue https://issues.apache.org/jira/browse/BEAM-12646 ? Thanks.
>>
>>
>>
>> <image001.png>
>>
>>
>>
>>
>>
>>
>>
>> From: Tao Li <ta...@zillow.com>
>> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
>> Date: Friday, August 6, 2021 at 11:12 AM
>> To: Alexey Romanenko <ar...@gmail.com>
>> Cc: "user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <ap...@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> Thanks @Alexey Romanenko please see my clarifications below.
>>
>>
>>
>>
>>
>> | “Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns. “
>>
>>
>>
>> [Tao] Basically my perf testing was comparing beam spark runner and native spark. In both the beam app and the native spark app, I was simply reading a parquet backed dataset and immediately saving it back to parquet. And we were seeing the beam app took 3-5 times longer than native spark. As I have shared in this thread previously, below call stack from spark runner was quite time consuming..
>>
>>
>>
>> <image001.png>
>>
>>
>>
>>
>>
>>
>>
>> | "Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670))”
>>
>>
>>
>> [Tao] We are not specifying `use_sdf_read` experimental flag in our beam app, so we are not using SDF translation.
>>
>>
>>
>>
>>
>>
>>
>> From: Alexey Romanenko <ar...@gmail.com>
>> Date: Friday, August 6, 2021 at 8:13 AM
>> To: Tao Li <ta...@zillow.com>
>> Cc: "user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <ap...@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>>
>>
>>
>>
>> On 5 Aug 2021, at 18:17, Tao Li <ta...@zillow.com> wrote:
>>
>>
>>
>> It was a great presentation!
>>
>>
>>
>> Thanks!
>>
>>
>>
>>  Regarding my perf testing, I was not doing aggregation, filtering, projection or joining. I was simply reading all the fields of parquet and then immediately save PCollection back to parquet.
>>
>>
>>
>> Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns.
>>
>>
>>
>>
>>
>> Regarding SDF translation, is it enabled by default?
>>
>>
>>
>> From Beam 2.30.0 release notes:
>>
>>
>>
>> "Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670))”
>>
>>
>>
>> —
>>
>> Alexey
>>
>>
>>
>>  I will check out ParquetIO splittable. Thanks!
>>
>>
>>
>> From: Alexey Romanenko <ar...@gmail.com>
>> Date: Thursday, August 5, 2021 at 6:40 AM
>> To: Tao Li <ta...@zillow.com>
>> Cc: "user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <ap...@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> It’s very likely that Spark SQL may have much better performance because of SQL push-downs and avoiding additional ser/deser operations.
>>
>>
>>
>> In the same time, did you try to leverage "withProjection()” in ParquetIO and project only the fields that you needed?
>>
>>
>>
>> Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?
>>
>>
>>
>> Also, using SDF translation for Read on Spark Runner can cause performance degradation as well (we noticed that in our experiments). Try to use non-SDF read (if not yet) [2]
>>
>>
>>
>>
>>
>> PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not sure if a recording is already available but you can find the slides here [3] that can be helpful.
>>
>>
>>
>>
>>
>> —
>>
>> Alexey
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-12070
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-10670
>>
>> [3] https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing
>>
>>
>>
>>
>>
>> On 5 Aug 2021, at 03:07, Tao Li <ta...@zillow.com> wrote:
>>
>>
>>
>> @Alexey Romanenko @Ismaël Mejía I assume you are experts on spark runner. Can you please take a look at this thread and confirm this jira covers the causes https://issues.apache.org/jira/browse/BEAM-12646 ?
>>
>>
>>
>> This perf issue is currently a blocker to me..
>>
>>
>>
>> Thanks so much!
>>
>>
>>
>> From: Tao Li <ta...@zillow.com>
>> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
>> Date: Friday, July 30, 2021 at 3:53 PM
>> To: Andrew Pilloud <ap...@google.com>, "user@beam.apache.org" <us...@beam.apache.org>
>> Cc: Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> Thanks everyone for your help.
>>
>>
>>
>> We actually did another round of perf comparison between Beam (on spark) and native spark, without any projection/filtering in the query (to rule out the “predicate pushdown” factor).
>>
>>
>>
>> The time spent on Beam with spark runner is still taking 3-5x period of time compared with native spark, and the cause ishttps://issues.apache.org/jira/browse/BEAM-12646 according to the spark metrics. Spark runner is pretty much the bottleneck.
>>
>>
>>
>> <image001.png>
>>
>>
>>
>> From: Andrew Pilloud <ap...@google.com>
>> Date: Thursday, July 29, 2021 at 2:11 PM
>> To: "user@beam.apache.org" <us...@beam.apache.org>
>> Cc: Tao Li <ta...@zillow.com>, Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.
>>
>>
>>
>> Andrew
>>
>>
>>
>> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <ap...@google.com> wrote:
>>
>> Beam SQL doesn't currently have project pushdown for ParquetIO (we are working to expand this to more IOs). Using ParquetIO withProjection directly will produce better results.
>>
>>
>>
>> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Could you try using Beam SQL [1] and see if that gives more similar result to your Spark SQL query? I would also be curious if the performance is sufficient using withProjection to only read the auction, price, and bidder columns.
>>
>>
>>
>> [1] https://beam.apache.org/documentation/dsls/sql/overview/
>>
>> [2] https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema-
>>
>>
>>
>> On Sat, Jul 24, 2021 at 10:23 AM Tao Li <ta...@zillow.com> wrote:
>>
>> Thanks Robert for filing BEAM-12646. This perf issue is a blocker for us to adopt Beam. It would be great if the community could conclude the root cause and share an ETA for the fix. Thanks so much!
>>
>>
>>
>>
>>
>> From: Robert Bradshaw <ro...@google.com>
>> Date: Wednesday, July 21, 2021 at 3:51 PM
>> To: Tao Li <ta...@zillow.com>
>> Cc: "user@beam.apache.org" <us...@beam.apache.org>, Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> On Wed, Jul 21, 2021 at 3:00 PM Tao Li <ta...@zillow.com> wrote:
>>
>> @Robert Bradshaw with Spark API, the code is actually much simple. We are just calling spark SQL API against a hive table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder  FROM bid”)
>>
>>
>>
>> Good chance that this is pushing projection of those few fields up into the read operator, which could be a dramatic savings. You could try doing it manually in Beam, or use Beam's SQL that should do the same.
>>
>>
>>
>>
>>
>> I think the “globally windowed GBK” optimization you are proposing is a good callout.
>>
>>
>>
>> Filed https://issues.apache.org/jira/browse/BEAM-12646 to track.
>>
>>
>>
>>
>>
>> From: Robert Bradshaw <ro...@google.com>
>> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
>> Date: Wednesday, July 21, 2021 at 1:09 PM
>> To: user <us...@beam.apache.org>
>> Cc: Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> On Wed, Jul 21, 2021 at 12:51 PM Tao Li <ta...@zillow.com> wrote:
>>
>> Kyle, I don’t expect such a huge perf diff as well. To your question, no I am not specifying withProjection or withSplit for parquet reader.
>>
>>
>>
>> Are you doing so in your Spark code?
>>
>>
>>
>> Below is my parquet read code:
>>
>>
>>
>> PCollection<FileIO.ReadableFile> files = pipeline
>>
>>                 .apply(FileIO.match().filepattern(beamRequiredPath))
>>
>>                 .apply(FileIO.readMatches());
>>
>>
>>
>> PCollection<Row> table = files
>>
>>                 .apply(ParquetIO
>>
>>                         .readFiles(avroSchema)
>>
>>                         .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false")))
>>
>>                 .apply(MapElements
>>
>>                         .into(TypeDescriptors.rows())
>>
>>                         .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema))))
>>
>>                 .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
>>
>>
>>
>>
>>
>> According to my investigation, looks like below call stack is very computation intensive and causing a lot of GC time. And looks like the stack comes from spark runner code.
>>
>>
>>
>> This does look inordinately expensive. I wonder if it would make sense to optimize the globally windowed GBK as some other runners do.
>>
>>
>>
>>
>>
>> <image001.png>
>>
>>
>>
>> From: Kyle Weaver <kc...@google.com>
>> Date: Tuesday, July 20, 2021 at 3:57 PM
>> To: Tao Li <ta...@zillow.com>
>> Cc: "user@beam.apache.org" <us...@beam.apache.org>, Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> Beam has its own implementation of Parquet IO, and doesn't use Spark's. It's possible Spark's implementation does more optimizations, though perhaps not enough to result in such a dramatic difference.
>>
>>
>>
>> I'm curious how your Parquet read is configured. In particular, if withProjection or withSplit are set.
>>
>>
>>
>> On Tue, Jul 20, 2021 at 3:21 PM Tao Li <ta...@zillow.com> wrote:
>>
>> Hi Kyle,
>>
>>
>>
>> The ParDo (which references the code I shared) is the only transformation in my pipeline. The input and output are parquet files in S3 (we are using beam ParquetIO).
>>
>>
>>
>> From: Kyle Weaver <kc...@google.com>
>> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
>> Date: Tuesday, July 20, 2021 at 2:13 PM
>> To: "user@beam.apache.org" <us...@beam.apache.org>
>> Cc: Yuchu Cao <yu...@trulia.com>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>
>>
>>
>> The DoFn you shared is simple enough that it seems unlikely to be the performance bottleneck here.
>>
>>
>>
>> Can you share more information about your complete pipeline? What other transforms are there? What sources/sinks are you using?
>>
>>
>>
>> On Tue, Jul 20, 2021 at 2:02 PM Tao Li <ta...@zillow.com> wrote:
>>
>> Hi Beam community,
>>
>>
>>
>> We are seeing a serious perf issue with beam using spark runner, compared with writing a native spark app. Can you please provide some help?
>>
>>
>>
>> The beam on spark app is taking 8-10 min, whereas a native spark is only taking 2 min. Below is Spark UI, from which you can see the flatMapToPair method is very time consuming. Is this method call coming from spark runner?
>>
>>
>>
>> <image001.png>
>>
>>
>>
>> I suspect this is caused by high GC time. See “GC Time” column below:
>>
>>
>>
>> <image002.png>
>>
>>
>>
>>
>>
>> The beam code is really simple, just a per row processing.
>>
>>
>>
>> public class CalcFn extends DoFn<Row, Row> {
>>
>>     protected Logger log = LoggerFactory.getLogger(this.getClass());
>>
>>     private Schema schema;
>>
>>
>>
>>     public CalcFn(Schema schema) {
>>
>>         this.schema = schema;
>>
>>
>>
>>
>>
>>
>>
>>     }
>>
>>
>>
>>     @ProcessElement
>>
>>     public void processElement(@Element Row row,OutputReceiver<Row> receiver) {
>>
>>        // Row row = ctx.element();
>>
>>         Long auction_value = (Long) row.getBaseValue("auction");
>>
>>         Long bid_value = (Long) row.getBaseValue("bidder");
>>
>>         Long price = (Long) row.getBaseValue("price");
>>
>>         Double euro = price * 0.82;
>>
>>
>>
>>         receiver.output( Row.withSchema(schema)
>>
>>                 .addValues(auction_value, euro, bid_value).build());
>>
>>     }
>>
>> }
>>
>>
>
>

Re: Perf issue with Beam on spark (spark runner)

Posted by Alexey Romanenko <ar...@gmail.com>.
Robert,

Do you have any numbers by chance regarding this optimisation?

Alexey

> On 5 Oct 2021, at 00:27, Robert Bradshaw <ro...@google.com> wrote:
> 
> https://github.com/apache/beam/pull/15637 <https://github.com/apache/beam/pull/15637> might help some.
> 
> On Thu, Sep 9, 2021 at 5:21 PM Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> Thanks Mike for this info!
> 
>  
> 
> From: Mike Kaplinskiy <mike@ladderlife.com <ma...@ladderlife.com>>
> Reply-To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Date: Tuesday, September 7, 2021 at 2:15 PM
> To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Cc: Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>>, Andrew Pilloud <apilloud@google.com <ma...@google.com>>, Ismaël Mejía <iemejia@gmail.com <ma...@gmail.com>>, Kyle Weaver <kcweaver@google.com <ma...@google.com>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> A long time ago when I was experimenting with the Spark runner for a batch job, I noticed that a lot of time was spend in GC as well. In my case I narrowed it down to how the Spark runner implements Coders. 
> 
>  
> 
> Spark's value prop is that it only serializes data when it truly has no other choice - i.e. when it needs to reclaim memory or when it sends things over the wire. Unfortunately due to the mismatch in serialization APIs between Beam and Spark, Beam's Spark runner actually just serializes things all the time. My theory was that the to/from byte array dance was slow. I attempted to fix this at https://github.com/apache/beam/pull/8371 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fpull%2F8371&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187448677%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=ZtgDb0R3gjSHVU1rpp6T0ZVl7ZhXXRhH%2BqFMX8Z1z%2Bo%3D&reserved=0> but I could never actually reproduce a speedup in performance benchmarks.
> 
>  
> 
> If you're feeling up to it, you could try reviving something like that PR and see if it helps.
> 
>  
> 
> Mike.
> 
> Ladder <https://nam11.safelinks.protection.outlook.com/?url=http%3A%2F%2Fbit.ly%2F1VRtWfS&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=RBjmeAAqHdrZmXZEP7ONXwXZyLOwwx6tQbST%2Bs6wq2Q%3D&reserved=0>. The smart, modern way to insure your life.
> 
>  
> 
>  
> 
> On Sat, Aug 14, 2021 at 4:35 PM Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> 
> @Alexey Romanenko <ma...@gmail.com> I tried out ParquetIO splittable and the processing time improved from 10 min to 6 min, but still much longer than 2 min using a native spark app.
> 
>  
> 
> We are still seeing a lot of GC cost from below call stack. Do you think this ticket can fix this issue https://issues.apache.org/jira/browse/BEAM-12646 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=Y4OpoFWLzBOf9Lfzg%2BBc%2ByTSsnIh%2FQVU4FSfrU93L%2F0%3D&reserved=0> ? Thanks.
> 
>  
> 
> <image001.png>
> 
>  
> 
>  
> 
>  
> 
> From: Tao Li <taol@zillow.com <ma...@zillow.com>>
> Reply-To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Date: Friday, August 6, 2021 at 11:12 AM
> To: Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>>
> Cc: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>, Andrew Pilloud <apilloud@google.com <ma...@google.com>>, Ismaël Mejía <iemejia@gmail.com <ma...@gmail.com>>, Kyle Weaver <kcweaver@google.com <ma...@google.com>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> Thanks @Alexey Romanenko <ma...@gmail.com> please see my clarifications below.
> 
>  
> 
>  
> 
> | “Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns. “
> 
>  
> 
> [Tao] Basically my perf testing was comparing beam spark runner and native spark. In both the beam app and the native spark app, I was simply reading a parquet backed dataset and immediately saving it back to parquet. And we were seeing the beam app took 3-5 times longer than native spark. As I have shared in this thread previously, below call stack from spark runner was quite time consuming..
> 
>  
> 
> <image001.png>
> 
>  
> 
>  
> 
>  
> 
> | "Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187468581%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=flZziWvOW842LjMedT6Zgj8Xj23tAE43pUxqGe43aIE%3D&reserved=0>))”
> 
>  
> 
> [Tao] We are not specifying `use_sdf_read` experimental flag in our beam app, so we are not using SDF translation.
> 
>  
> 
>  
> 
>  
> 
> From: Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>>
> Date: Friday, August 6, 2021 at 8:13 AM
> To: Tao Li <taol@zillow.com <ma...@zillow.com>>
> Cc: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>, Andrew Pilloud <apilloud@google.com <ma...@google.com>>, Ismaël Mejía <iemejia@gmail.com <ma...@gmail.com>>, Kyle Weaver <kcweaver@google.com <ma...@google.com>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
>  
> 
>  
> 
> On 5 Aug 2021, at 18:17, Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> 
>  
> 
> It was a great presentation!
> 
>  
> 
> Thanks!
> 
>  
> 
>  Regarding my perf testing, I was not doing aggregation, filtering, projection or joining. I was simply reading all the fields of parquet and then immediately save PCollection back to parquet.
> 
>  
> 
> Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns. 
> 
>  
> 
>  
> 
> Regarding SDF translation, is it enabled by default?
> 
>  
> 
> From Beam 2.30.0 release notes:
> 
>  
> 
> "Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187468581%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=flZziWvOW842LjMedT6Zgj8Xj23tAE43pUxqGe43aIE%3D&reserved=0>))”
> 
>  
> 
> —
> 
> Alexey
> 
>  
> 
>  I will check out ParquetIO splittable. Thanks!
> 
>  
> 
> From: Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>>
> Date: Thursday, August 5, 2021 at 6:40 AM
> To: Tao Li <taol@zillow.com <ma...@zillow.com>>
> Cc: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>, Andrew Pilloud <apilloud@google.com <ma...@google.com>>, Ismaël Mejía <iemejia@gmail.com <ma...@gmail.com>>, Kyle Weaver <kcweaver@google.com <ma...@google.com>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> It’s very likely that Spark SQL may have much better performance because of SQL push-downs and avoiding additional ser/deser operations.
> 
>  
> 
> In the same time, did you try to leverage "withProjection()” in ParquetIO and project only the fields that you needed? 
> 
>  
> 
> Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?
> 
>  
> 
> Also, using SDF translation for Read on Spark Runner can cause performance degradation as well (we noticed that in our experiments). Try to use non-SDF read (if not yet) [2]
> 
>  
> 
>  
> 
> PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not sure if a recording is already available but you can find the slides here [3] that can be helpful.
> 
>  
> 
>  
> 
> —
> 
> Alexey
> 
>  
> 
> [1] https://issues.apache.org/jira/browse/BEAM-12070 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=7SR8gicpWaWcvsTvbdlJmPIYnLx6FAY%2FPD2w3ZcDgr4%3D&reserved=0>
> [2] https://issues.apache.org/jira/browse/BEAM-10670 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=aeqziE3fpMhlX%2Bv6P9WNv7Zo8wN6V1KPMZwZoIDIEqg%3D&reserved=0>
> [3] https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=R54%2FpJjYkpUgmqx1OLTKH%2BWpKc%2FqwtXWUs6Ccfp8%2F2g%3D&reserved=0>
>  
> 
>  
> 
> On 5 Aug 2021, at 03:07, Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> 
>  
> 
> @Alexey Romanenko <ma...@gmail.com> @Ismaël Mejía <ma...@gmail.com> I assume you are experts on spark runner. Can you please take a look at this thread and confirm this jira covers the causes https://issues.apache.org/jira/browse/BEAM-12646 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187488499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=x3wE0UIwjU4ACQNPW0zb2%2F4oUi6nzRe9mhy5qSsQQA0%3D&reserved=0> ?
> 
>  
> 
> This perf issue is currently a blocker to me..
> 
>  
> 
> Thanks so much!
> 
>  
> 
> From: Tao Li <taol@zillow.com <ma...@zillow.com>>
> Reply-To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Date: Friday, July 30, 2021 at 3:53 PM
> To: Andrew Pilloud <apilloud@google.com <ma...@google.com>>, "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Cc: Kyle Weaver <kcweaver@google.com <ma...@google.com>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> Thanks everyone for your help.
> 
>  
> 
> We actually did another round of perf comparison between Beam (on spark) and native spark, without any projection/filtering in the query (to rule out the “predicate pushdown” factor).
> 
>  
> 
> The time spent on Beam with spark runner is still taking 3-5x period of time compared with native spark, and the cause ishttps://issues.apache.org/jira/browse/BEAM-12646 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187488499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=x3wE0UIwjU4ACQNPW0zb2%2F4oUi6nzRe9mhy5qSsQQA0%3D&reserved=0> according to the spark metrics. Spark runner is pretty much the bottleneck.
> 
>  
> 
> <image001.png>
> 
>  
> 
> From: Andrew Pilloud <apilloud@google.com <ma...@google.com>>
> Date: Thursday, July 29, 2021 at 2:11 PM
> To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Cc: Tao Li <taol@zillow.com <ma...@zillow.com>>, Kyle Weaver <kcweaver@google.com <ma...@google.com>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.
> 
>  
> 
> Andrew
> 
>  
> 
> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <apilloud@google.com <ma...@google.com>> wrote:
> 
> Beam SQL doesn't currently have project pushdown for ParquetIO (we are working to expand this to more IOs). Using ParquetIO withProjection directly will produce better results.
> 
>  
> 
> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
> 
> Could you try using Beam SQL [1] and see if that gives more similar result to your Spark SQL query? I would also be curious if the performance is sufficient using withProjection to only read the auction, price, and bidder columns. 
> 
>  
> 
> [1] https://beam.apache.org/documentation/dsls/sql/overview/ <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187498450%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=Wth3BzRulruxGaJSbmiTCQZj3S76E%2F6VdXTYsHTZExg%3D&reserved=0>
> [2] https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema- <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187498450%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=RrEBGsGMZLzLXaxoC2rSWtGKZ%2BwsjYAHFGeJxyXVrUQ%3D&reserved=0>
>  
> 
> On Sat, Jul 24, 2021 at 10:23 AM Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> 
> Thanks Robert for filing BEAM-12646 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187508407%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=KFZLNEj382MLIx2dEQEcVEKObN5IU1ZgXCuZiFpNn1Q%3D&reserved=0>. This perf issue is a blocker for us to adopt Beam. It would be great if the community could conclude the root cause and share an ETA for the fix. Thanks so much!  
> 
>  
> 
>  
> 
> From: Robert Bradshaw <robertwb@google.com <ma...@google.com>>
> Date: Wednesday, July 21, 2021 at 3:51 PM
> To: Tao Li <taol@zillow.com <ma...@zillow.com>>
> Cc: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>, Kyle Weaver <kcweaver@google.com <ma...@google.com>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> On Wed, Jul 21, 2021 at 3:00 PM Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> 
> @Robert Bradshaw <ma...@google.com> with Spark API, the code is actually much simple. We are just calling spark SQL API against a hive table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder  FROM bid”)
> 
>  
> 
> Good chance that this is pushing projection of those few fields up into the read operator, which could be a dramatic savings. You could try doing it manually in Beam, or use Beam's SQL that should do the same. 
> 
>  
> 
>  
> 
> I think the “globally windowed GBK” optimization you are proposing is a good callout.
> 
>  
> 
> Filed https://issues.apache.org/jira/browse/BEAM-12646 <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187508407%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=KFZLNEj382MLIx2dEQEcVEKObN5IU1ZgXCuZiFpNn1Q%3D&reserved=0> to track. 
> 
>  
> 
>  
> 
> From: Robert Bradshaw <robertwb@google.com <ma...@google.com>>
> Reply-To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Date: Wednesday, July 21, 2021 at 1:09 PM
> To: user <user@beam.apache.org <ma...@beam.apache.org>>
> Cc: Kyle Weaver <kcweaver@google.com <ma...@google.com>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> On Wed, Jul 21, 2021 at 12:51 PM Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> 
> Kyle, I don’t expect such a huge perf diff as well. To your question, no I am not specifying withProjection or withSplit for parquet reader.
> 
>  
> 
> Are you doing so in your Spark code? 
> 
>  
> 
> Below is my parquet read code:
> 
>  
> 
> PCollection<FileIO.ReadableFile> files = pipeline
> 
>                 .apply(FileIO.match().filepattern(beamRequiredPath))
> 
>                 .apply(FileIO.readMatches());
> 
>  
> 
> PCollection<Row> table = files
> 
>                 .apply(ParquetIO
> 
>                         .readFiles(avroSchema)
> 
>                         .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false")))
> 
>                 .apply(MapElements
> 
>                         .into(TypeDescriptors.rows())
> 
>                         .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema))))
> 
>                 .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
> 
>  
> 
>  
> 
> According to my investigation, looks like below call stack is very computation intensive and causing a lot of GC time. And looks like the stack comes from spark runner code.
> 
>  
> 
> This does look inordinately expensive. I wonder if it would make sense to optimize the globally windowed GBK as some other runners do. 
> 
>  
> 
>  
> 
> <image001.png>
> 
>  
> 
> From: Kyle Weaver <kcweaver@google.com <ma...@google.com>>
> Date: Tuesday, July 20, 2021 at 3:57 PM
> To: Tao Li <taol@zillow.com <ma...@zillow.com>>
> Cc: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>, Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> Beam has its own implementation of Parquet IO, and doesn't use Spark's. It's possible Spark's implementation does more optimizations, though perhaps not enough to result in such a dramatic difference.
> 
>  
> 
> I'm curious how your Parquet read is configured. In particular, if withProjection or withSplit are set.
> 
>  
> 
> On Tue, Jul 20, 2021 at 3:21 PM Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> 
> Hi Kyle,
> 
>  
> 
> The ParDo (which references the code I shared) is the only transformation in my pipeline. The input and output are parquet files in S3 (we are using beam ParquetIO).
> 
>  
> 
> From: Kyle Weaver <kcweaver@google.com <ma...@google.com>>
> Reply-To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Date: Tuesday, July 20, 2021 at 2:13 PM
> To: "user@beam.apache.org <ma...@beam.apache.org>" <user@beam.apache.org <ma...@beam.apache.org>>
> Cc: Yuchu Cao <yuchuc@trulia.com <ma...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> The DoFn you shared is simple enough that it seems unlikely to be the performance bottleneck here.
> 
>  
> 
> Can you share more information about your complete pipeline? What other transforms are there? What sources/sinks are you using?
> 
>  
> 
> On Tue, Jul 20, 2021 at 2:02 PM Tao Li <taol@zillow.com <ma...@zillow.com>> wrote:
> 
> Hi Beam community,
> 
>  
> 
> We are seeing a serious perf issue with beam using spark runner, compared with writing a native spark app. Can you please provide some help?
> 
>  
> 
> The beam on spark app is taking 8-10 min, whereas a native spark is only taking 2 min. Below is Spark UI, from which you can see the flatMapToPair method is very time consuming. Is this method call coming from spark runner?
> 
>  
> 
> <image001.png>
> 
>  
> 
> I suspect this is caused by high GC time. See “GC Time” column below:
> 
>  
> 
> <image002.png>
> 
>  
> 
>  
> 
> The beam code is really simple, just a per row processing.
> 
>  
> 
> public class CalcFn extends DoFn<Row, Row> {
> 
>     protected Logger log = LoggerFactory.getLogger(this.getClass());
> 
>     private Schema schema;
> 
>  
> 
>     public CalcFn(Schema schema) {
> 
>         this.schema = schema;
> 
>  
> 
>  
> 
>  
> 
>     }
> 
>  
> 
>     @ProcessElement
> 
>     public void processElement(@Element Row row,OutputReceiver<Row> receiver) {
> 
>        // Row row = ctx.element();
> 
>         Long auction_value = (Long) row.getBaseValue("auction");
> 
>         Long bid_value = (Long) row.getBaseValue("bidder");
> 
>         Long price = (Long) row.getBaseValue("price");
> 
>         Double euro = price * 0.82;
> 
>  
> 
>         receiver.output( Row.withSchema(schema)
> 
>                 .addValues(auction_value, euro, bid_value).build());
> 
>     }
> 
> }
> 
>  
> 


Re: Perf issue with Beam on spark (spark runner)

Posted by Robert Bradshaw <ro...@google.com>.
https://github.com/apache/beam/pull/15637 might help some.

On Thu, Sep 9, 2021 at 5:21 PM Tao Li <ta...@zillow.com> wrote:

> Thanks Mike for this info!
>
>
>
> *From: *Mike Kaplinskiy <mi...@ladderlife.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Tuesday, September 7, 2021 at 2:15 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Cc: *Alexey Romanenko <ar...@gmail.com>, Andrew Pilloud <
> apilloud@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> A long time ago when I was experimenting with the Spark runner for a batch
> job, I noticed that a lot of time was spend in GC as well. In my case I
> narrowed it down to how the Spark runner implements Coders.
>
>
>
> Spark's value prop is that it only serializes data when it truly has no
> other choice - i.e. when it needs to reclaim memory or when it sends things
> over the wire. Unfortunately due to the mismatch in serialization APIs
> between Beam and Spark, Beam's Spark runner actually just serializes things
> all the time. My theory was that the to/from byte array dance was slow. I
> attempted to fix this at https://github.com/apache/beam/pull/8371
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fpull%2F8371&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187448677%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=ZtgDb0R3gjSHVU1rpp6T0ZVl7ZhXXRhH%2BqFMX8Z1z%2Bo%3D&reserved=0>
> but I could never actually reproduce a speedup in performance benchmarks.
>
>
>
> If you're feeling up to it, you could try reviving something like that PR
> and see if it helps.
>
>
>
> Mike.
>
> Ladder
> <https://nam11.safelinks.protection.outlook.com/?url=http%3A%2F%2Fbit.ly%2F1VRtWfS&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=RBjmeAAqHdrZmXZEP7ONXwXZyLOwwx6tQbST%2Bs6wq2Q%3D&reserved=0>.
> The smart, modern way to insure your life.
>
>
>
>
>
> On Sat, Aug 14, 2021 at 4:35 PM Tao Li <ta...@zillow.com> wrote:
>
> @Alexey Romanenko <ar...@gmail.com> I tried out ParquetIO
> splittable and the processing time improved from 10 min to 6 min, but still
> much longer than 2 min using a native spark app.
>
>
>
> We are still seeing a lot of GC cost from below call stack. Do you think
> this ticket can fix this issue
> https://issues.apache.org/jira/browse/BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=Y4OpoFWLzBOf9Lfzg%2BBc%2ByTSsnIh%2FQVU4FSfrU93L%2F0%3D&reserved=0>
> ? Thanks.
>
>
>
> [image: Graphical user interface, text, application Description
> automatically generated]
>
>
>
>
>
>
>
> *From: *Tao Li <ta...@zillow.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, August 6, 2021 at 11:12 AM
> *To: *Alexey Romanenko <ar...@gmail.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <
> apilloud@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> Thanks @Alexey Romanenko <ar...@gmail.com> please see my
> clarifications below.
>
>
>
>
>
> | “Well, of course, if you read all fields (columns) then you don’t need
> column projection. Otherwise, it can give a quite significant performance
> boost, especially for large tables with many columns. “
>
>
>
> [Tao] Basically my perf testing was comparing beam spark runner and native
> spark. In both the beam app and the native spark app, I was simply reading
> a parquet backed dataset and immediately saving it back to parquet. And we
> were seeing the beam app took 3-5 times longer than native spark. As I have
> shared in this thread previously, below call stack from spark runner was
> quite time consuming..
>
>
>
> [image: Graphical user interface, text, application Description
> automatically generated]
>
>
>
>
>
>
>
> | *"Legacy Read transform (non-SDF based Read) is used by default for
> non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to
> re-enable SDF based Read transforms
> ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187468581%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=flZziWvOW842LjMedT6Zgj8Xj23tAE43pUxqGe43aIE%3D&reserved=0>))”*
>
>
>
> [Tao] We are not specifying `use_sdf_read` experimental flag in our beam
> app, so we are not using SDF translation.
>
>
>
>
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Date: *Friday, August 6, 2021 at 8:13 AM
> *To: *Tao Li <ta...@zillow.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <
> apilloud@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
>
>
>
>
> On 5 Aug 2021, at 18:17, Tao Li <ta...@zillow.com> wrote:
>
>
>
> It was a great presentation!
>
>
>
> Thanks!
>
>
>
>  Regarding my perf testing, I was not doing aggregation, filtering,
> projection or joining. I was simply reading all the fields of parquet and
> then immediately save PCollection back to parquet.
>
>
>
> Well, of course, if you read all fields (columns) then you don’t need
> column projection. Otherwise, it can give a quite significant performance
> boost, especially for large tables with many columns.
>
>
>
>
>
> Regarding SDF translation, is it enabled by default?
>
>
>
> From Beam 2.30.0 release notes:
>
>
>
> *"Legacy Read transform (non-SDF based Read) is used by default for
> non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to
> re-enable SDF based Read transforms
> ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187468581%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=flZziWvOW842LjMedT6Zgj8Xj23tAE43pUxqGe43aIE%3D&reserved=0>))”*
>
>
>
> *—*
>
> Alexey
>
>
>
>  I will check out ParquetIO splittable. Thanks!
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Date: *Thursday, August 5, 2021 at 6:40 AM
> *To: *Tao Li <ta...@zillow.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Andrew Pilloud <
> apilloud@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> It’s very likely that Spark SQL may have much better performance because
> of SQL push-downs and avoiding additional ser/deser operations.
>
>
>
> In the same time, did you try to leverage "withProjection()” in ParquetIO
> and project only the fields that you needed?
>
>
>
> Did you use ParquetIO splittable (it's not enabled by default, fixed in
> [1])?
>
>
>
> Also, using SDF translation for Read on Spark Runner can cause performance
> degradation as well (we noticed that in our experiments). Try to use
> non-SDF read (if not yet) [2]
>
>
>
>
>
> PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m
> not sure if a recording is already available but you can find the slides
> here [3] that can be helpful.
>
>
>
>
>
> —
>
> Alexey
>
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-12070
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=7SR8gicpWaWcvsTvbdlJmPIYnLx6FAY%2FPD2w3ZcDgr4%3D&reserved=0>
>
> [2] https://issues.apache.org/jira/browse/BEAM-10670
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=aeqziE3fpMhlX%2Bv6P9WNv7Zo8wN6V1KPMZwZoIDIEqg%3D&reserved=0>
>
> [3]
> https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=R54%2FpJjYkpUgmqx1OLTKH%2BWpKc%2FqwtXWUs6Ccfp8%2F2g%3D&reserved=0>
>
>
>
>
>
> On 5 Aug 2021, at 03:07, Tao Li <ta...@zillow.com> wrote:
>
>
>
> @Alexey Romanenko <ar...@gmail.com> @Ismaël Mejía
> <ie...@gmail.com> I assume you are experts on spark runner. Can you
> please take a look at this thread and confirm this jira covers the causes
> https://issues.apache.org/jira/browse/BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187488499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=x3wE0UIwjU4ACQNPW0zb2%2F4oUi6nzRe9mhy5qSsQQA0%3D&reserved=0>
>  ?
>
>
>
> This perf issue is currently a blocker to me..
>
>
>
> Thanks so much!
>
>
>
> *From: *Tao Li <ta...@zillow.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, July 30, 2021 at 3:53 PM
> *To: *Andrew Pilloud <ap...@google.com>, "user@beam.apache.org" <
> user@beam.apache.org>
> *Cc: *Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> Thanks everyone for your help.
>
>
>
> We actually did another round of perf comparison between Beam (on spark)
> and native spark, without any projection/filtering in the query (to rule
> out the “predicate pushdown” factor).
>
>
>
> The time spent on Beam with spark runner is still taking 3-5x period of
> time compared with native spark, and the cause is
> https://issues.apache.org/jira/browse/BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187488499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=x3wE0UIwjU4ACQNPW0zb2%2F4oUi6nzRe9mhy5qSsQQA0%3D&reserved=0>
>  according to the spark metrics. Spark runner is pretty much the
> bottleneck.
>
>
>
> <image001.png>
>
>
>
> *From: *Andrew Pilloud <ap...@google.com>
> *Date: *Thursday, July 29, 2021 at 2:11 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Cc: *Tao Li <ta...@zillow.com>, Kyle Weaver <kc...@google.com>, Yuchu
> Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.
>
>
>
> Andrew
>
>
>
> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <ap...@google.com>
> wrote:
>
> Beam SQL doesn't currently have project pushdown for ParquetIO (we are
> working to expand this to more IOs). Using ParquetIO withProjection
> directly will produce better results.
>
>
>
> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
> Could you try using Beam SQL [1] and see if that gives more similar result
> to your Spark SQL query? I would also be curious if the performance is
> sufficient using withProjection to only read the auction, price, and bidder
> columns.
>
>
>
> [1] https://beam.apache.org/documentation/dsls/sql/overview/
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187498450%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=Wth3BzRulruxGaJSbmiTCQZj3S76E%2F6VdXTYsHTZExg%3D&reserved=0>
>
> [2]
> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema-
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187498450%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=RrEBGsGMZLzLXaxoC2rSWtGKZ%2BwsjYAHFGeJxyXVrUQ%3D&reserved=0>
>
>
>
> On Sat, Jul 24, 2021 at 10:23 AM Tao Li <ta...@zillow.com> wrote:
>
> Thanks Robert for filing BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187508407%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=KFZLNEj382MLIx2dEQEcVEKObN5IU1ZgXCuZiFpNn1Q%3D&reserved=0>.
> This perf issue is a blocker for us to adopt Beam. It would be great if the
> community could conclude the root cause and share an ETA for the fix.
> Thanks so much!
>
>
>
>
>
> *From: *Robert Bradshaw <ro...@google.com>
> *Date: *Wednesday, July 21, 2021 at 3:51 PM
> *To: *Tao Li <ta...@zillow.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Kyle Weaver <
> kcweaver@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> On Wed, Jul 21, 2021 at 3:00 PM Tao Li <ta...@zillow.com> wrote:
>
> @Robert Bradshaw <ro...@google.com> with Spark API, the code is
> actually much simple. We are just calling spark SQL API against a hive
> table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder  FROM bid”)
>
>
>
> Good chance that this is pushing projection of those few fields up into
> the read operator, which could be a dramatic savings. You could try doing
> it manually in Beam, or use Beam's SQL that should do the same.
>
>
>
>
>
> I think the “globally windowed GBK” optimization you are proposing is a
> good callout.
>
>
>
> Filed https://issues.apache.org/jira/browse/BEAM-12646
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187508407%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=KFZLNEj382MLIx2dEQEcVEKObN5IU1ZgXCuZiFpNn1Q%3D&reserved=0>
>  to track.
>
>
>
>
>
> *From: *Robert Bradshaw <ro...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Wednesday, July 21, 2021 at 1:09 PM
> *To: *user <us...@beam.apache.org>
> *Cc: *Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> On Wed, Jul 21, 2021 at 12:51 PM Tao Li <ta...@zillow.com> wrote:
>
> Kyle, I don’t expect such a huge perf diff as well. To your question, no I
> am not specifying withProjection or withSplit for parquet reader.
>
>
>
> Are you doing so in your Spark code?
>
>
>
> Below is my parquet read code:
>
>
>
> PCollection<FileIO.ReadableFile> files = pipeline
>
>                 .apply(FileIO.match().filepattern(beamRequiredPath))
>
>                 .apply(FileIO.readMatches());
>
>
>
> PCollection<Row> table = files
>
>                 .apply(ParquetIO
>
>                         .readFiles(avroSchema)
>
>
> .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
> "false")))
>
>                 .apply(MapElements
>
>                         .into(TypeDescriptors.rows())
>
>
> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema))))
>
>                 .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
>
>
>
>
>
> According to my investigation, looks like below call stack is very
> computation intensive and causing a lot of GC time. And looks like the
> stack comes from spark runner code.
>
>
>
> This does look inordinately expensive. I wonder if it would make sense to
> optimize the globally windowed GBK as some other runners do.
>
>
>
>
>
> <image001.png>
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Date: *Tuesday, July 20, 2021 at 3:57 PM
> *To: *Tao Li <ta...@zillow.com>
> *Cc: *"user@beam.apache.org" <us...@beam.apache.org>, Yuchu Cao <
> yuchuc@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> Beam has its own implementation of Parquet IO, and doesn't use Spark's.
> It's possible Spark's implementation does more optimizations, though
> perhaps not enough to result in such a dramatic difference.
>
>
>
> I'm curious how your Parquet read is configured. In particular,
> if withProjection or withSplit are set.
>
>
>
> On Tue, Jul 20, 2021 at 3:21 PM Tao Li <ta...@zillow.com> wrote:
>
> Hi Kyle,
>
>
>
> The ParDo (which references the code I shared) is the only transformation
> in my pipeline. The input and output are parquet files in S3 (we are using
> beam ParquetIO).
>
>
>
> *From: *Kyle Weaver <kc...@google.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Tuesday, July 20, 2021 at 2:13 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Cc: *Yuchu Cao <yu...@trulia.com>
> *Subject: *Re: Perf issue with Beam on spark (spark runner)
>
>
>
> The DoFn you shared is simple enough that it seems unlikely to be the
> performance bottleneck here.
>
>
>
> Can you share more information about your complete pipeline? What other
> transforms are there? What sources/sinks are you using?
>
>
>
> On Tue, Jul 20, 2021 at 2:02 PM Tao Li <ta...@zillow.com> wrote:
>
> Hi Beam community,
>
>
>
> We are seeing a serious perf issue with beam using spark runner, compared
> with writing a native spark app. Can you please provide some help?
>
>
>
> The beam on spark app is taking 8-10 min, whereas a native spark is only
> taking 2 min. Below is Spark UI, from which you can see the flatMapToPair
> method is very time consuming. Is this method call coming from spark
> runner?
>
>
>
> <image001.png>
>
>
>
> I suspect this is caused by high GC time. See “GC Time” column below:
>
>
>
> <image002.png>
>
>
>
>
>
> The beam code is really simple, just a per row processing.
>
>
>
> public class CalcFn extends DoFn<Row, Row> {
>
>     protected Logger log = LoggerFactory.getLogger(this.getClass());
>
>     private Schema schema;
>
>
>
>     public CalcFn(Schema schema) {
>
>         this.schema = schema;
>
>
>
>
>
>
>
>     }
>
>
>
>     @ProcessElement
>
>     public void processElement(@Element Row row,OutputReceiver<Row>
> receiver) {
>
>        // Row row = ctx.element();
>
>         Long auction_value = (Long) row.getBaseValue("auction");
>
>         Long bid_value = (Long) row.getBaseValue("bidder");
>
>         Long price = (Long) row.getBaseValue("price");
>
>         Double euro = price * 0.82;
>
>
>
>         receiver.output( Row.withSchema(schema)
>
>                 .addValues(auction_value, euro, bid_value).build());
>
>     }
>
> }
>
>
>
>

Re: Perf issue with Beam on spark (spark runner)

Posted by Tao Li <ta...@zillow.com>.
Thanks Mike for this info!

From: Mike Kaplinskiy <mi...@ladderlife.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Tuesday, September 7, 2021 at 2:15 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Cc: Alexey Romanenko <ar...@gmail.com>, Andrew Pilloud <ap...@google.com>, Ismaël Mejía <ie...@gmail.com>, Kyle Weaver <kc...@google.com>, Yuchu Cao <yu...@trulia.com>
Subject: Re: Perf issue with Beam on spark (spark runner)

A long time ago when I was experimenting with the Spark runner for a batch job, I noticed that a lot of time was spend in GC as well. In my case I narrowed it down to how the Spark runner implements Coders.

Spark's value prop is that it only serializes data when it truly has no other choice - i.e. when it needs to reclaim memory or when it sends things over the wire. Unfortunately due to the mismatch in serialization APIs between Beam and Spark, Beam's Spark runner actually just serializes things all the time. My theory was that the to/from byte array dance was slow. I attempted to fix this at https://github.com/apache/beam/pull/8371<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fpull%2F8371&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187448677%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=ZtgDb0R3gjSHVU1rpp6T0ZVl7ZhXXRhH%2BqFMX8Z1z%2Bo%3D&reserved=0> but I could never actually reproduce a speedup in performance benchmarks.

If you're feeling up to it, you could try reviving something like that PR and see if it helps.

Mike.

Ladder<https://nam11.safelinks.protection.outlook.com/?url=http%3A%2F%2Fbit.ly%2F1VRtWfS&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=RBjmeAAqHdrZmXZEP7ONXwXZyLOwwx6tQbST%2Bs6wq2Q%3D&reserved=0>. The smart, modern way to insure your life.


On Sat, Aug 14, 2021 at 4:35 PM Tao Li <ta...@zillow.com>> wrote:
@Alexey Romanenko<ma...@gmail.com> I tried out ParquetIO splittable and the processing time improved from 10 min to 6 min, but still much longer than 2 min using a native spark app.

We are still seeing a lot of GC cost from below call stack. Do you think this ticket can fix this issue https://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=Y4OpoFWLzBOf9Lfzg%2BBc%2ByTSsnIh%2FQVU4FSfrU93L%2F0%3D&reserved=0> ? Thanks.

[Graphical user interface, text, application    Description automatically generated]



From: Tao Li <ta...@zillow.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, August 6, 2021 at 11:12 AM
To: Alexey Romanenko <ar...@gmail.com>>
Cc: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>, Andrew Pilloud <ap...@google.com>>, Ismaël Mejía <ie...@gmail.com>>, Kyle Weaver <kc...@google.com>>, Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

Thanks @Alexey Romanenko<ma...@gmail.com> please see my clarifications below.


| “Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns. “

[Tao] Basically my perf testing was comparing beam spark runner and native spark. In both the beam app and the native spark app, I was simply reading a parquet backed dataset and immediately saving it back to parquet. And we were seeing the beam app took 3-5 times longer than native spark. As I have shared in this thread previously, below call stack from spark runner was quite time consuming..

[Graphical user interface, text, application    Description automatically generated]



| "Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187468581%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=flZziWvOW842LjMedT6Zgj8Xj23tAE43pUxqGe43aIE%3D&reserved=0>))”

[Tao] We are not specifying `use_sdf_read` experimental flag in our beam app, so we are not using SDF translation.



From: Alexey Romanenko <ar...@gmail.com>>
Date: Friday, August 6, 2021 at 8:13 AM
To: Tao Li <ta...@zillow.com>>
Cc: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>, Andrew Pilloud <ap...@google.com>>, Ismaël Mejía <ie...@gmail.com>>, Kyle Weaver <kc...@google.com>>, Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)



On 5 Aug 2021, at 18:17, Tao Li <ta...@zillow.com>> wrote:

It was a great presentation!

Thanks!

 Regarding my perf testing, I was not doing aggregation, filtering, projection or joining. I was simply reading all the fields of parquet and then immediately save PCollection back to parquet.

Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns.


Regarding SDF translation, is it enabled by default?

From Beam 2.30.0 release notes:

"Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187468581%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=flZziWvOW842LjMedT6Zgj8Xj23tAE43pUxqGe43aIE%3D&reserved=0>))”

—
Alexey

 I will check out ParquetIO splittable. Thanks!

From: Alexey Romanenko <ar...@gmail.com>>
Date: Thursday, August 5, 2021 at 6:40 AM
To: Tao Li <ta...@zillow.com>>
Cc: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>, Andrew Pilloud <ap...@google.com>>, Ismaël Mejía <ie...@gmail.com>>, Kyle Weaver <kc...@google.com>>, Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

It’s very likely that Spark SQL may have much better performance because of SQL push-downs and avoiding additional ser/deser operations.

In the same time, did you try to leverage "withProjection()” in ParquetIO and project only the fields that you needed?

Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?

Also, using SDF translation for Read on Spark Runner can cause performance degradation as well (we noticed that in our experiments). Try to use non-SDF read (if not yet) [2]


PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not sure if a recording is already available but you can find the slides here [3] that can be helpful.


—
Alexey

[1] https://issues.apache.org/jira/browse/BEAM-12070<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=7SR8gicpWaWcvsTvbdlJmPIYnLx6FAY%2FPD2w3ZcDgr4%3D&reserved=0>
[2] https://issues.apache.org/jira/browse/BEAM-10670<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=aeqziE3fpMhlX%2Bv6P9WNv7Zo8wN6V1KPMZwZoIDIEqg%3D&reserved=0>
[3] https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187478539%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=R54%2FpJjYkpUgmqx1OLTKH%2BWpKc%2FqwtXWUs6Ccfp8%2F2g%3D&reserved=0>


On 5 Aug 2021, at 03:07, Tao Li <ta...@zillow.com>> wrote:

@Alexey Romanenko<ma...@gmail.com> @Ismaël Mejía<ma...@gmail.com> I assume you are experts on spark runner. Can you please take a look at this thread and confirm this jira covers the causes https://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187488499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=x3wE0UIwjU4ACQNPW0zb2%2F4oUi6nzRe9mhy5qSsQQA0%3D&reserved=0> ?

This perf issue is currently a blocker to me..

Thanks so much!

From: Tao Li <ta...@zillow.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Friday, July 30, 2021 at 3:53 PM
To: Andrew Pilloud <ap...@google.com>>, "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Cc: Kyle Weaver <kc...@google.com>>, Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

Thanks everyone for your help.

We actually did another round of perf comparison between Beam (on spark) and native spark, without any projection/filtering in the query (to rule out the “predicate pushdown” factor).

The time spent on Beam with spark runner is still taking 3-5x period of time compared with native spark, and the cause ishttps://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187488499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=x3wE0UIwjU4ACQNPW0zb2%2F4oUi6nzRe9mhy5qSsQQA0%3D&reserved=0> according to the spark metrics. Spark runner is pretty much the bottleneck.

<image001.png>

From: Andrew Pilloud <ap...@google.com>>
Date: Thursday, July 29, 2021 at 2:11 PM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Cc: Tao Li <ta...@zillow.com>>, Kyle Weaver <kc...@google.com>>, Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.

Andrew

On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <ap...@google.com>> wrote:
Beam SQL doesn't currently have project pushdown for ParquetIO (we are working to expand this to more IOs). Using ParquetIO withProjection directly will produce better results.

On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <ro...@google.com>> wrote:
Could you try using Beam SQL [1] and see if that gives more similar result to your Spark SQL query? I would also be curious if the performance is sufficient using withProjection to only read the auction, price, and bidder columns.

[1] https://beam.apache.org/documentation/dsls/sql/overview/<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187498450%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=Wth3BzRulruxGaJSbmiTCQZj3S76E%2F6VdXTYsHTZExg%3D&reserved=0>
[2] https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema-<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187498450%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=RrEBGsGMZLzLXaxoC2rSWtGKZ%2BwsjYAHFGeJxyXVrUQ%3D&reserved=0>

On Sat, Jul 24, 2021 at 10:23 AM Tao Li <ta...@zillow.com>> wrote:
Thanks Robert for filing BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187508407%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=KFZLNEj382MLIx2dEQEcVEKObN5IU1ZgXCuZiFpNn1Q%3D&reserved=0>. This perf issue is a blocker for us to adopt Beam. It would be great if the community could conclude the root cause and share an ETA for the fix. Thanks so much!


From: Robert Bradshaw <ro...@google.com>>
Date: Wednesday, July 21, 2021 at 3:51 PM
To: Tao Li <ta...@zillow.com>>
Cc: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>, Kyle Weaver <kc...@google.com>>, Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

On Wed, Jul 21, 2021 at 3:00 PM Tao Li <ta...@zillow.com>> wrote:
@Robert Bradshaw<ma...@google.com> with Spark API, the code is actually much simple. We are just calling spark SQL API against a hive table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder  FROM bid”)

Good chance that this is pushing projection of those few fields up into the read operator, which could be a dramatic savings. You could try doing it manually in Beam, or use Beam's SQL that should do the same.


I think the “globally windowed GBK” optimization you are proposing is a good callout.

Filed https://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187508407%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=KFZLNEj382MLIx2dEQEcVEKObN5IU1ZgXCuZiFpNn1Q%3D&reserved=0> to track.


From: Robert Bradshaw <ro...@google.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Wednesday, July 21, 2021 at 1:09 PM
To: user <us...@beam.apache.org>>
Cc: Kyle Weaver <kc...@google.com>>, Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

On Wed, Jul 21, 2021 at 12:51 PM Tao Li <ta...@zillow.com>> wrote:
Kyle, I don’t expect such a huge perf diff as well. To your question, no I am not specifying withProjection or withSplit for parquet reader.

Are you doing so in your Spark code?

Below is my parquet read code:

PCollection<FileIO.ReadableFile> files = pipeline
                .apply(FileIO.match().filepattern(beamRequiredPath))
                .apply(FileIO.readMatches());

PCollection<Row> table = files
                .apply(ParquetIO
                        .readFiles(avroSchema)
                        .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false")))
                .apply(MapElements
                        .into(TypeDescriptors.rows())
                        .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema))))
                .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));


According to my investigation, looks like below call stack is very computation intensive and causing a lot of GC time. And looks like the stack comes from spark runner code.

This does look inordinately expensive. I wonder if it would make sense to optimize the globally windowed GBK as some other runners do.


<image001.png>

From: Kyle Weaver <kc...@google.com>>
Date: Tuesday, July 20, 2021 at 3:57 PM
To: Tao Li <ta...@zillow.com>>
Cc: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>, Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

Beam has its own implementation of Parquet IO, and doesn't use Spark's. It's possible Spark's implementation does more optimizations, though perhaps not enough to result in such a dramatic difference.

I'm curious how your Parquet read is configured. In particular, if withProjection or withSplit are set.

On Tue, Jul 20, 2021 at 3:21 PM Tao Li <ta...@zillow.com>> wrote:
Hi Kyle,

The ParDo (which references the code I shared) is the only transformation in my pipeline. The input and output are parquet files in S3 (we are using beam ParquetIO).

From: Kyle Weaver <kc...@google.com>>
Reply-To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Date: Tuesday, July 20, 2021 at 2:13 PM
To: "user@beam.apache.org<ma...@beam.apache.org>" <us...@beam.apache.org>>
Cc: Yuchu Cao <yu...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

The DoFn you shared is simple enough that it seems unlikely to be the performance bottleneck here.

Can you share more information about your complete pipeline? What other transforms are there? What sources/sinks are you using?

On Tue, Jul 20, 2021 at 2:02 PM Tao Li <ta...@zillow.com>> wrote:
Hi Beam community,

We are seeing a serious perf issue with beam using spark runner, compared with writing a native spark app. Can you please provide some help?

The beam on spark app is taking 8-10 min, whereas a native spark is only taking 2 min. Below is Spark UI, from which you can see the flatMapToPair method is very time consuming. Is this method call coming from spark runner?

<image001.png>

I suspect this is caused by high GC time. See “GC Time” column below:

<image002.png>


The beam code is really simple, just a per row processing.

public class CalcFn extends DoFn<Row, Row> {
    protected Logger log = LoggerFactory.getLogger(this.getClass());
    private Schema schema;

    public CalcFn(Schema schema) {
        this.schema = schema;



    }

    @ProcessElement
    public void processElement(@Element Row row,OutputReceiver<Row> receiver) {
       // Row row = ctx.element();
        Long auction_value = (Long) row.getBaseValue("auction");
        Long bid_value = (Long) row.getBaseValue("bidder");
        Long price = (Long) row.getBaseValue("price");
        Double euro = price * 0.82;

        receiver.output( Row.withSchema(schema)
                .addValues(auction_value, euro, bid_value).build());
    }
}