You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Eddy G <ka...@gmail.com> on 2019/10/08 14:43:48 UTC

Beam discarding massive amount of events due to Window object or inner processing

Been recently developing a Beam (Dataflow) consumer which read from a PubSub subscription and outputs to Parquet files the combination of all those objects grouped within the same window.

While I was doing testing of this without a huge load everything seemed to work fine.

However, after performing some heavy testing I can see that from 1.000.000 events sent to that PubSub queue, only 1000 make it to Parquet!

According to multiple wall times across different stages, the one which parses the events prior applying the window seems to last 58 minutes. The last stage which writes to Parquet files lasts 1h and 32 minutes.

I will show now the most relevant parts of the code within, hope you can shed some light if its due to the logic that comes before the Window object definition or if it's the Window object iself.

pipeline
        .apply("Reading PubSub Events",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(options.getSubscription()))
        .apply("Map to AvroSchemaRecord (GenericRecord)",
            ParDo.of(new PubsubMessageToGenericRecord()))
        .setCoder(AvroCoder.of(AVRO_SCHEMA))
        .apply("15m window",
            Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )

Also note that I'm running Beam 2.9.0.

Tried moving the logic after the Window definition but still, most messages don't make it to the Parquet file.

Could the logic inside the second stage be too heavy so that messages arrive too late and get discarded in the Window? The logic basically consists reading the payload, parsing into a POJO (reading inner Map attributes, filtering and such)

However, if I sent a million events to PubSub, all those million events make it till the Parquet write to file stage, but when reading those parquets in Spark and checking the records they aren't complete. Does that make sense?


Re: Beam discarding massive amount of events due to Window object or inner processing

Posted by Tim Sell <ts...@google.com>.
I think everyone who followed this thread learned something! I know I did.

Thanks for asking these questions. The summary and code snippets were just
the right length to be accessible and focussed.

On Wed, 16 Oct 2019, 06:04 Eddy G, <ka...@gmail.com> wrote:

> Thank you so so much guys for the amazing feedback you're giving me!
>
> I'm applying all of it and deep diving into more detail and see where I
> could also go from there so I can still get the pipeline performance way
> better.
>
> Again, really appreciated guys, you are amazing.
>

Re: Beam discarding massive amount of events due to Window object or inner processing

Posted by Eddy G <ka...@gmail.com>.
Thank you so so much guys for the amazing feedback you're giving me!

I'm applying all of it and deep diving into more detail and see where I could also go from there so I can still get the pipeline performance way better.

Again, really appreciated guys, you are amazing.

Re: Beam discarding massive amount of events due to Window object or inner processing

Posted by Tim Sell <ts...@google.com>.
You're getting 1 shard per pane, and you get a pane every time it's
triggered on an early firing. And then another one in the final on-time
pane. To have 1 file with 1 shard for every 15 minute window you need to
only fire on window close. Ie AfterWatermark.pastendofwindow, without early
firing.

On Mon, 14 Oct 2019, 14:35 Eddy G, <ka...@gmail.com> wrote:

> Thanks a lot everyone for your so valuable feedback!
>
> Just updated my code, made some minor refactoring and seems to be working
> like a charm. Still some data being dropped due to lateness (but I'm
> talking about 100 elements per 2 million, so no "big deal" there, I will
> take a look into extending lateness and overall performance bits that I'm
> missing out).
>
> A thing that worries me a lot is that the wall time has been exponentially
> increasing up to 1 day and 3 hours in the stage that is in charge of
> writing all that captured data into parquet files, supposedly due to
> .parquet file writing code.
>
> I suppose that this is also the reason why I still get tons of small
> parquet files within a same bucket, as I should only have, in a perfect
> scenario, 4 files (1 each 15 minutes due to the Window object length), when
> I'm currently having +60!
>
>             .apply("Write .parquet File(s)",
>                 FileIO
>                     .<String, GenericRecord>writeDynamic()
>                     .by((SerializableFunction<GenericRecord, String>)
> event -> {
>                         // specify partitioning here
>                     })
>                     .via(ParquetIO.sink(AVRO_SCHEMA))
>                     .to(options.getOutputDirectory())
>                     .withNaming(type -> ParquetFileNaming.getNaming(...))
>                     .withDestinationCoder(StringUtf8Coder.of())
>                     .withNumShards(1) // should this be 0? Could this
> imply increasing of costs if set to 0?
>

Re: Beam discarding massive amount of events due to Window object or inner processing

Posted by Eddy G <ka...@gmail.com>.
Thanks a lot everyone for your so valuable feedback!

Just updated my code, made some minor refactoring and seems to be working like a charm. Still some data being dropped due to lateness (but I'm talking about 100 elements per 2 million, so no "big deal" there, I will take a look into extending lateness and overall performance bits that I'm missing out).

A thing that worries me a lot is that the wall time has been exponentially increasing up to 1 day and 3 hours in the stage that is in charge of writing all that captured data into parquet files, supposedly due to .parquet file writing code.

I suppose that this is also the reason why I still get tons of small parquet files within a same bucket, as I should only have, in a perfect scenario, 4 files (1 each 15 minutes due to the Window object length), when I'm currently having +60!

            .apply("Write .parquet File(s)",
                FileIO
                    .<String, GenericRecord>writeDynamic()
                    .by((SerializableFunction<GenericRecord, String>) event -> {
                        // specify partitioning here
                    })
                    .via(ParquetIO.sink(AVRO_SCHEMA))
                    .to(options.getOutputDirectory())
                    .withNaming(type -> ParquetFileNaming.getNaming(...))
                    .withDestinationCoder(StringUtf8Coder.of())
                    .withNumShards(1) // should this be 0? Could this imply increasing of costs if set to 0?

Re: Beam discarding massive amount of events due to Window object or inner processing

Posted by Reza Rokni <re...@google.com>.
Hi,

When inserting into PubSub can you set message metadata with the timestamp
from the event? If yes then you can make use of:

https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-

Cheers

Reza

On Wed, 9 Oct 2019 at 16:31, Eddy G <ka...@gmail.com> wrote:

> Thanks a lot for the quick response!
>
> I can recall having already played with this when I first deployed this
> consumer and couldn't get around the following issue that I'm getting now
> again...
>
> java.lang.IllegalArgumentException: Cannot output with timestamp
> 2019-10-09T03:12:04.250Z. Output timestamps must be no earlier than the
> timestamp of the current input (2019-10-09T03:12:04.292Z) minus the allowed
> skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for
> details on changing the allowed skew.
>
> How can I manage skew? Wouldn't it increase as it's happening with the
> current version which uses processing time?
>
> The timestamp that I'm inferring comes straight from the JSON object
> (which is the one looking forward to use) and not from PubSub itself.
>


-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Beam discarding massive amount of events due to Window object or inner processing

Posted by Eddy G <ka...@gmail.com>.
Thanks a lot for the quick response!

I can recall having already played with this when I first deployed this consumer and couldn't get around the following issue that I'm getting now again...

java.lang.IllegalArgumentException: Cannot output with timestamp 2019-10-09T03:12:04.250Z. Output timestamps must be no earlier than the timestamp of the current input (2019-10-09T03:12:04.292Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

How can I manage skew? Wouldn't it increase as it's happening with the current version which uses processing time?

The timestamp that I'm inferring comes straight from the JSON object (which is the one looking forward to use) and not from PubSub itself.

Re: Beam discarding massive amount of events due to Window object or inner processing

Posted by Kenneth Knowles <ke...@apache.org>.
This is an unfortunate usability problem with triggers where you can
accidentally close the window and drop all data. I think instead, you
probably want this trigger:

  Repeatedly.forever(
      AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))

The way I recommend to express this trigger is:

    AfterWatermark.pastEndOfWindow().withEarlyFirings(
        AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))

In the second case it is impossible to accidentally "close" the window and
drop all data.

Kenn

On Tue, Oct 8, 2019 at 7:43 AM Eddy G <ka...@gmail.com> wrote:

> Been recently developing a Beam (Dataflow) consumer which read from a
> PubSub subscription and outputs to Parquet files the combination of all
> those objects grouped within the same window.
>
> While I was doing testing of this without a huge load everything seemed to
> work fine.
>
> However, after performing some heavy testing I can see that from 1.000.000
> events sent to that PubSub queue, only 1000 make it to Parquet!
>
> According to multiple wall times across different stages, the one which
> parses the events prior applying the window seems to last 58 minutes. The
> last stage which writes to Parquet files lasts 1h and 32 minutes.
>
> I will show now the most relevant parts of the code within, hope you can
> shed some light if its due to the logic that comes before the Window object
> definition or if it's the Window object iself.
>
> pipeline
>         .apply("Reading PubSub Events",
>             PubsubIO.readMessagesWithAttributes()
>                 .fromSubscription(options.getSubscription()))
>         .apply("Map to AvroSchemaRecord (GenericRecord)",
>             ParDo.of(new PubsubMessageToGenericRecord()))
>         .setCoder(AvroCoder.of(AVRO_SCHEMA))
>         .apply("15m window",
>
> Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
>                 .triggering(AfterProcessingTime
>                     .pastFirstElementInPane()
>                     .plusDelayOf(Duration.standardSeconds(1)))
>                 .withAllowedLateness(Duration.ZERO)
>                 .accumulatingFiredPanes()
>         )
>
> Also note that I'm running Beam 2.9.0.
>
> Tried moving the logic after the Window definition but still, most
> messages don't make it to the Parquet file.
>
> Could the logic inside the second stage be too heavy so that messages
> arrive too late and get discarded in the Window? The logic basically
> consists reading the payload, parsing into a POJO (reading inner Map
> attributes, filtering and such)
>
> However, if I sent a million events to PubSub, all those million events
> make it till the Parquet write to file stage, but when reading those
> parquets in Spark and checking the records they aren't complete. Does that
> make sense?
>
>