You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Lydian <ly...@gmail.com> on 2023/04/18 17:57:20 UTC

How Beam Pipeline Handle late events

Hi,

We are using Beam (Python SDK + Flink Runner) to backup our streaming data
from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute fixed
window to group messages.  We've had similar pipeline in spark that we want
to replace it with this new pipeline.  However, the Beam pipeline seems
always having events missing, which we are thinking could be due to late
events (because the number of missing events get lower when having higher
allow_lateness)

We've tried the following approach to avoid late events, but none of them
are working:
1.  Use Processing timestamp instead of event time. Ideally if windowing is
using the processing timestamp, It shouldn't consider any event as late.
But this doesn't seem to work at all.
2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems not
working as expected, we've also configured the allow_lateness. But it still
have missing events compared to our old spark pipelines.

Here's the simplified code we have
```

def *add_timestamp*(event: Any) -> Any:

    import time

    from apache_beam import window

    return window.*TimestampedValue*(event, time.*time*())


(pipeline

    | "Kafka Read" >> *ReadFromKafka*(topic="test-topic",
consumer_config=consumer_config)

    | "Adding 'trigger_processing_time' timestamp" >> beam.*Map*
(add_timestamp)

    | "Window into Fixed Intervals"

    >> beam.*WindowInto*(

        beam.window.*FixedWindows*(fixed_window_size),

        allowed_lateness=beam.utils.timestamp.*Duration*(allowed_lateness)

    )

    |  "Write to s3" >> beam.*ParDo*(*WriteBatchesToS3*(s3_path))
```

I am wondering:
1. Is the add_timestamp approach correctly marked it to use processing time
for windowing?  If so, why there still late event consider we are using
processing time and not event time?
2.  Are there are any other approaches to avoid dropping any late event
besides ` allowed_lateness`?  In flink you can output those late events as
side output, wondering if we can do similar thing in Beam as well? Would
someone provide some code example?

Could someone help us debugging this?  Thanks!

---
* Flink's documentation about late event as side output:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output


Sincerely,
Lydian Lee

Re: How Beam Pipeline Handle late events

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
On Fri, Apr 21, 2023 at 3:37 AM Pavel Solomin <p....@gmail.com> wrote:
>
> Thank you for the information.
>
> I'm assuming you had a unique ID in records, and you observed some IDs missing in Beam output comparing with Spark, and not just some duplicates produced by Spark.
>
> If so, I would suggest to create a P1 issue at https://github.com/apache/beam/issues

+1, ideally with enough information to reproduce. As far as I
understand, what you have should just work (but I'm not a flink
expert).

> Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ?
>
> Unfortunately, I can't be more helpful here, but let me share some of the gotchas I had from my previous experience of running Beam on top of Flink for similar use-case (landing of data from messaging system into files):
>
> (1) https://github.com/apache/beam/issues/26041 - I've solved that by adding a runId into file names which is re-generated between app (re) starts
>
> (2) I used processing time watermarks and simple window without lateness set up - combining it with (1) achieved no data loss
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>
>
>
>
>
>
> On Thu, 20 Apr 2023 at 02:18, Lydian <ly...@gmail.com> wrote:
>>
>> Yes, we did enabled this in our pipeline.
>>
>> On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin <p....@gmail.com> wrote:
>>>
>>> Thank you
>>>
>>> Just to confirm: how did you configure Kafka offset commits? Did you have this flag enabled?
>>>
>>>
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--
>>>
>>>
>>> On Thursday, 20 April 2023, Trevor Burke <tr...@affirm.com> wrote:
>>> > Hi Pavel,
>>> > Thanks for the reply.
>>> > No, the event losses are not consistent. While we've been running our pipelines in parallel (Beam vs Spark) we are seeing some days with no event loss and some days with some, but it's always less than 0.05%
>>> >
>>> >
>>> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p....@gmail.com> wrote:
>>> >>
>>> >> Hello Lydian,
>>> >> Do you always observe data loss? Or - maybe, it happens only when you restart your pipeline from a Flink savepoint? If you lose data only between restarts - is you issue similar to https://github.com/apache/beam/issues/26041 ?
>>> >>
>>> >> Best Regards,
>>> >> Pavel Solomin
>>> >>
>>> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Tue, 18 Apr 2023 at 18:58, Lydian <ly...@gmail.com> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute fixed window to group messages.  We've had similar pipeline in spark that we want to replace it with this new pipeline.  However, the Beam pipeline seems always having events missing, which we are thinking could be due to late events (because the number of missing events get lower when having higher allow_lateness)
>>> >>>
>>> >>> We've tried the following approach to avoid late events, but none of them are working:
>>> >>> 1.  Use Processing timestamp instead of event time. Ideally if windowing is using the processing timestamp, It shouldn't consider any event as late. But this doesn't seem to work at all.
>>> >>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems not working as expected, we've also configured the allow_lateness. But it still have missing events compared to our old spark pipelines.
>>> >>>
>>> >>> Here's the simplified code we have
>>> >>> ```
>>> >>>
>>> >>> def add_timestamp(event: Any) -> Any:
>>> >>>
>>> >>>     import time
>>> >>>
>>> >>>     from apache_beam import window
>>> >>>
>>> >>>     return window.TimestampedValue(event, time.time())
>>> >>>
>>> >>> (pipeline
>>> >>>
>>> >>>     | "Kafka Read" >> ReadFromKafka(topic="test-topic", consumer_config=consumer_config)
>>> >>>
>>> >>>     | "Adding 'trigger_processing_time' timestamp" >> beam.Map(add_timestamp)
>>> >>>
>>> >>>     | "Window into Fixed Intervals"
>>> >>>
>>> >>>     >> beam.WindowInto(
>>> >>>
>>> >>>         beam.window.FixedWindows(fixed_window_size),
>>> >>>
>>> >>>         allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>>> >>>
>>> >>>     )
>>> >>>
>>> >>>     |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>>> >>>
>>> >>> ```
>>> >>>
>>> >>> I am wondering:
>>> >>> 1. Is the add_timestamp approach correctly marked it to use processing time for windowing?  If so, why there still late event consider we are using processing time and not event time?
>>> >>> 2.  Are there are any other approaches to avoid dropping any late event besides ` allowed_lateness`?  In flink you can output those late events as side output, wondering if we can do similar thing in Beam as well? Would someone provide some code example?
>>> >>>
>>> >>> Could someone help us debugging this?  Thanks!
>>> >>>
>>> >>> ---
>>> >>> * Flink's documentation about late event as side output:  https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>>> >>>
>>> >>>
>>> >>> Sincerely,
>>> >>> Lydian Lee
>>> >
>>> >
>>> > --
>>> > Trevor Burke (he/him)   |   Software Engineer, Data Platform   |   415.794.4111
>>> > <https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0>
>>> >
>>>
>>> --
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>
>>>
>>>
>>>
>>>
>> --
>> Sincerely,
>> Lydian Lee
>>

Re: How Beam Pipeline Handle late events

Posted by Pavel Solomin <p....@gmail.com>.
Thank you for the information.

I'm assuming you had a unique ID in records, and you observed some IDs
missing in Beam output comparing with Spark, and not just some duplicates
produced by Spark.

If so, I would suggest to create a P1 issue at
https://github.com/apache/beam/issues

Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ?

Unfortunately, I can't be more helpful here, but let me share some of the
gotchas I had from my previous experience of running Beam on top of Flink
for similar use-case (landing of data from messaging system into files):

(1) https://github.com/apache/beam/issues/26041 - I've solved that by
adding a runId into file names which is re-generated between app (re) starts

(2) I used processing time watermarks and simple window without lateness
set up - combining it with (1) achieved no data loss

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Thu, 20 Apr 2023 at 02:18, Lydian <ly...@gmail.com> wrote:

> Yes, we did enabled this in our pipeline.
>
> On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin <p....@gmail.com>
> wrote:
>
>> Thank you
>>
>> Just to confirm: how did you configure Kafka offset commits? Did you have
>> this flag enabled?
>>
>>
>>
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--
>>
>>
>> On Thursday, 20 April 2023, Trevor Burke <tr...@affirm.com> wrote:
>> > Hi Pavel,
>> > Thanks for the reply.
>> > No, the event losses are not consistent. While we've been running our
>> pipelines in parallel (Beam vs Spark) we are seeing some days with no event
>> loss and some days with some, but it's always less than 0.05%
>> >
>> >
>> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p....@gmail.com>
>> wrote:
>> >>
>> >> Hello Lydian,
>> >> Do you always observe data loss? Or - maybe, it happens only when you
>> restart your pipeline from a Flink savepoint? If you lose data only between
>> restarts - is you issue similar to
>> https://github.com/apache/beam/issues/26041 ?
>> >>
>> >> Best Regards,
>> >> Pavel Solomin
>> >>
>> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> >>
>> >>
>> >>
>> >>
>> >> On Tue, 18 Apr 2023 at 18:58, Lydian <ly...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming
>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute
>> fixed window to group messages.  We've had similar pipeline in spark that
>> we want to replace it with this new pipeline.  However, the Beam pipeline
>> seems always having events missing, which we are thinking could be due to
>> late events (because the number of missing events get lower when having
>> higher allow_lateness)
>> >>>
>> >>> We've tried the following approach to avoid late events, but none of
>> them are working:
>> >>> 1.  Use Processing timestamp instead of event time. Ideally if
>> windowing is using the processing timestamp, It shouldn't consider any
>> event as late. But this doesn't seem to work at all.
>> >>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems
>> not working as expected, we've also configured the allow_lateness. But it
>> still have missing events compared to our old spark pipelines.
>> >>>
>> >>> Here's the simplified code we have
>> >>> ```
>> >>>
>> >>> def add_timestamp(event: Any) -> Any:
>> >>>
>> >>>     import time
>> >>>
>> >>>     from apache_beam import window
>> >>>
>> >>>     return window.TimestampedValue(event, time.time())
>> >>>
>> >>> (pipeline
>> >>>
>> >>>     | "Kafka Read" >> ReadFromKafka(topic="test-topic",
>> consumer_config=consumer_config)
>> >>>
>> >>>     | "Adding 'trigger_processing_time' timestamp" >>
>> beam.Map(add_timestamp)
>> >>>
>> >>>     | "Window into Fixed Intervals"
>> >>>
>> >>>     >> beam.WindowInto(
>> >>>
>> >>>         beam.window.FixedWindows(fixed_window_size),
>> >>>
>> >>>
>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>> >>>
>> >>>     )
>> >>>
>> >>>     |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>> >>>
>> >>> ```
>> >>>
>> >>> I am wondering:
>> >>> 1. Is the add_timestamp approach correctly marked it to use
>> processing time for windowing?  If so, why there still late event consider
>> we are using processing time and not event time?
>> >>> 2.  Are there are any other approaches to avoid dropping any late
>> event besides ` allowed_lateness`?  In flink you can output those late
>> events as side output, wondering if we can do similar thing in Beam as
>> well? Would someone provide some code example?
>> >>>
>> >>> Could someone help us debugging this?  Thanks!
>> >>>
>> >>> ---
>> >>> * Flink's documentation about late event as side output:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>> >>>
>> >>>
>> >>> Sincerely,
>> >>> Lydian Lee
>> >
>> >
>> > --
>> > Trevor Burke (he/him)   |   Software Engineer, Data Platform   |
>>  415.794.4111
>> > <
>> https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0
>> >
>> >
>>
>> --
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>> --
> Sincerely,
> Lydian Lee
>
>

Re: How Beam Pipeline Handle late events

Posted by Lydian <ly...@gmail.com>.
Yes, we did enabled this in our pipeline.

On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin <p....@gmail.com> wrote:

> Thank you
>
> Just to confirm: how did you configure Kafka offset commits? Did you have
> this flag enabled?
>
>
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--
>
>
> On Thursday, 20 April 2023, Trevor Burke <tr...@affirm.com> wrote:
> > Hi Pavel,
> > Thanks for the reply.
> > No, the event losses are not consistent. While we've been running our
> pipelines in parallel (Beam vs Spark) we are seeing some days with no event
> loss and some days with some, but it's always less than 0.05%
> >
> >
> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p....@gmail.com>
> wrote:
> >>
> >> Hello Lydian,
> >> Do you always observe data loss? Or - maybe, it happens only when you
> restart your pipeline from a Flink savepoint? If you lose data only between
> restarts - is you issue similar to
> https://github.com/apache/beam/issues/26041 ?
> >>
> >> Best Regards,
> >> Pavel Solomin
> >>
> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> >>
> >>
> >>
> >>
> >> On Tue, 18 Apr 2023 at 18:58, Lydian <ly...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming
> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute
> fixed window to group messages.  We've had similar pipeline in spark that
> we want to replace it with this new pipeline.  However, the Beam pipeline
> seems always having events missing, which we are thinking could be due to
> late events (because the number of missing events get lower when having
> higher allow_lateness)
> >>>
> >>> We've tried the following approach to avoid late events, but none of
> them are working:
> >>> 1.  Use Processing timestamp instead of event time. Ideally if
> windowing is using the processing timestamp, It shouldn't consider any
> event as late. But this doesn't seem to work at all.
> >>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems
> not working as expected, we've also configured the allow_lateness. But it
> still have missing events compared to our old spark pipelines.
> >>>
> >>> Here's the simplified code we have
> >>> ```
> >>>
> >>> def add_timestamp(event: Any) -> Any:
> >>>
> >>>     import time
> >>>
> >>>     from apache_beam import window
> >>>
> >>>     return window.TimestampedValue(event, time.time())
> >>>
> >>> (pipeline
> >>>
> >>>     | "Kafka Read" >> ReadFromKafka(topic="test-topic",
> consumer_config=consumer_config)
> >>>
> >>>     | "Adding 'trigger_processing_time' timestamp" >>
> beam.Map(add_timestamp)
> >>>
> >>>     | "Window into Fixed Intervals"
> >>>
> >>>     >> beam.WindowInto(
> >>>
> >>>         beam.window.FixedWindows(fixed_window_size),
> >>>
> >>>
> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
> >>>
> >>>     )
> >>>
> >>>     |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
> >>>
> >>> ```
> >>>
> >>> I am wondering:
> >>> 1. Is the add_timestamp approach correctly marked it to use processing
> time for windowing?  If so, why there still late event consider we are
> using processing time and not event time?
> >>> 2.  Are there are any other approaches to avoid dropping any late
> event besides ` allowed_lateness`?  In flink you can output those late
> events as side output, wondering if we can do similar thing in Beam as
> well? Would someone provide some code example?
> >>>
> >>> Could someone help us debugging this?  Thanks!
> >>>
> >>> ---
> >>> * Flink's documentation about late event as side output:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
> >>>
> >>>
> >>> Sincerely,
> >>> Lydian Lee
> >
> >
> > --
> > Trevor Burke (he/him)   |   Software Engineer, Data Platform   |
>  415.794.4111
> > <
> https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0
> >
> >
>
> --
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
> --
Sincerely,
Lydian Lee

Re: How Beam Pipeline Handle late events

Posted by Pavel Solomin <p....@gmail.com>.
Thank you

Just to confirm: how did you configure Kafka offset commits? Did you have
this flag enabled?


https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--


On Thursday, 20 April 2023, Trevor Burke <tr...@affirm.com> wrote:
> Hi Pavel,
> Thanks for the reply.
> No, the event losses are not consistent. While we've been running our
pipelines in parallel (Beam vs Spark) we are seeing some days with no event
loss and some days with some, but it's always less than 0.05%
>
>
> On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p....@gmail.com>
wrote:
>>
>> Hello Lydian,
>> Do you always observe data loss? Or - maybe, it happens only when you
restart your pipeline from a Flink savepoint? If you lose data only between
restarts - is you issue similar to
https://github.com/apache/beam/issues/26041 ?
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>
>>
>>
>>
>> On Tue, 18 Apr 2023 at 18:58, Lydian <ly...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> We are using Beam (Python SDK + Flink Runner) to backup our streaming
data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute
fixed window to group messages.  We've had similar pipeline in spark that
we want to replace it with this new pipeline.  However, the Beam pipeline
seems always having events missing, which we are thinking could be due to
late events (because the number of missing events get lower when having
higher allow_lateness)
>>>
>>> We've tried the following approach to avoid late events, but none of
them are working:
>>> 1.  Use Processing timestamp instead of event time. Ideally if
windowing is using the processing timestamp, It shouldn't consider any
event as late. But this doesn't seem to work at all.
>>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems
not working as expected, we've also configured the allow_lateness. But it
still have missing events compared to our old spark pipelines.
>>>
>>> Here's the simplified code we have
>>> ```
>>>
>>> def add_timestamp(event: Any) -> Any:
>>>
>>>     import time
>>>
>>>     from apache_beam import window
>>>
>>>     return window.TimestampedValue(event, time.time())
>>>
>>> (pipeline
>>>
>>>     | "Kafka Read" >> ReadFromKafka(topic="test-topic",
consumer_config=consumer_config)
>>>
>>>     | "Adding 'trigger_processing_time' timestamp" >>
beam.Map(add_timestamp)
>>>
>>>     | "Window into Fixed Intervals"
>>>
>>>     >> beam.WindowInto(
>>>
>>>         beam.window.FixedWindows(fixed_window_size),
>>>
>>>         allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>>>
>>>     )
>>>
>>>     |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>>>
>>> ```
>>>
>>> I am wondering:
>>> 1. Is the add_timestamp approach correctly marked it to use processing
time for windowing?  If so, why there still late event consider we are
using processing time and not event time?
>>> 2.  Are there are any other approaches to avoid dropping any late event
besides ` allowed_lateness`?  In flink you can output those late events as
side output, wondering if we can do similar thing in Beam as well? Would
someone provide some code example?
>>>
>>> Could someone help us debugging this?  Thanks!
>>>
>>> ---
>>> * Flink's documentation about late event as side output:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>>>
>>>
>>> Sincerely,
>>> Lydian Lee
>
>
> --
> Trevor Burke (he/him)   |   Software Engineer, Data Platform   |
 415.794.4111
> <
https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0
>
>

-- 
Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>

Re: How Beam Pipeline Handle late events

Posted by Trevor Burke <tr...@affirm.com>.
Hi Pavel,

Thanks for the reply.

No, the event losses are not consistent. While we've been running our
pipelines in parallel (Beam vs Spark) we are seeing some days with no event
loss and some days with some, but it's always less than 0.05%



On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <p....@gmail.com> wrote:

> Hello Lydian,
>
> Do you always observe data loss? Or - maybe, it happens only when you
> restart your pipeline from a Flink savepoint? If you lose data only between
> restarts - is you issue similar to
> https://github.com/apache/beam/issues/26041 ?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Tue, 18 Apr 2023 at 18:58, Lydian <ly...@gmail.com> wrote:
>
>> Hi,
>>
>> We are using Beam (Python SDK + Flink Runner) to backup our streaming
>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute
>> fixed window to group messages.  We've had similar pipeline in spark that
>> we want to replace it with this new pipeline.  However, the Beam pipeline
>> seems always having events missing, which we are thinking could be due to
>> late events (because the number of missing events get lower when having
>> higher allow_lateness)
>>
>> We've tried the following approach to avoid late events, but none of them
>> are working:
>> 1.  Use Processing timestamp instead of event time. Ideally if windowing
>> is using the processing timestamp, It shouldn't consider any event as late.
>> But this doesn't seem to work at all.
>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems not
>> working as expected, we've also configured the allow_lateness. But it still
>> have missing events compared to our old spark pipelines.
>>
>> Here's the simplified code we have
>> ```
>>
>> def *add_timestamp*(event: Any) -> Any:
>>
>>     import time
>>
>>     from apache_beam import window
>>
>>     return window.*TimestampedValue*(event, time.*time*())
>>
>>
>> (pipeline
>>
>>     | "Kafka Read" >> *ReadFromKafka*(topic="test-topic",
>> consumer_config=consumer_config)
>>
>>     | "Adding 'trigger_processing_time' timestamp" >> beam.*Map*
>> (add_timestamp)
>>
>>     | "Window into Fixed Intervals"
>>
>>     >> beam.*WindowInto*(
>>
>>         beam.window.*FixedWindows*(fixed_window_size),
>>
>>         allowed_lateness=beam.utils.timestamp.*Duration*
>> (allowed_lateness)
>>
>>     )
>>
>>     |  "Write to s3" >> beam.*ParDo*(*WriteBatchesToS3*(s3_path))
>> ```
>>
>> I am wondering:
>> 1. Is the add_timestamp approach correctly marked it to use processing
>> time for windowing?  If so, why there still late event consider we are
>> using processing time and not event time?
>> 2.  Are there are any other approaches to avoid dropping any late event
>> besides ` allowed_lateness`?  In flink you can output those late events as
>> side output, wondering if we can do similar thing in Beam as well? Would
>> someone provide some code example?
>>
>> Could someone help us debugging this?  Thanks!
>>
>> ---
>> * Flink's documentation about late event as side output:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>>
>>
>> Sincerely,
>> Lydian Lee
>>
>>

-- 
Trevor Burke (he/him)   |   Software Engineer, Data Platform   |
 415.794.4111

Re: How Beam Pipeline Handle late events

Posted by Pavel Solomin <p....@gmail.com>.
Hello Lydian,

Do you always observe data loss? Or - maybe, it happens only when you
restart your pipeline from a Flink savepoint? If you lose data only between
restarts - is you issue similar to
https://github.com/apache/beam/issues/26041 ?

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Tue, 18 Apr 2023 at 18:58, Lydian <ly...@gmail.com> wrote:

> Hi,
>
> We are using Beam (Python SDK + Flink Runner) to backup our streaming data
> from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute fixed
> window to group messages.  We've had similar pipeline in spark that we want
> to replace it with this new pipeline.  However, the Beam pipeline seems
> always having events missing, which we are thinking could be due to late
> events (because the number of missing events get lower when having higher
> allow_lateness)
>
> We've tried the following approach to avoid late events, but none of them
> are working:
> 1.  Use Processing timestamp instead of event time. Ideally if windowing
> is using the processing timestamp, It shouldn't consider any event as late.
> But this doesn't seem to work at all.
> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems not
> working as expected, we've also configured the allow_lateness. But it still
> have missing events compared to our old spark pipelines.
>
> Here's the simplified code we have
> ```
>
> def *add_timestamp*(event: Any) -> Any:
>
>     import time
>
>     from apache_beam import window
>
>     return window.*TimestampedValue*(event, time.*time*())
>
>
> (pipeline
>
>     | "Kafka Read" >> *ReadFromKafka*(topic="test-topic",
> consumer_config=consumer_config)
>
>     | "Adding 'trigger_processing_time' timestamp" >> beam.*Map*
> (add_timestamp)
>
>     | "Window into Fixed Intervals"
>
>     >> beam.*WindowInto*(
>
>         beam.window.*FixedWindows*(fixed_window_size),
>
>         allowed_lateness=beam.utils.timestamp.*Duration*(allowed_lateness)
>
>     )
>
>     |  "Write to s3" >> beam.*ParDo*(*WriteBatchesToS3*(s3_path))
> ```
>
> I am wondering:
> 1. Is the add_timestamp approach correctly marked it to use processing
> time for windowing?  If so, why there still late event consider we are
> using processing time and not event time?
> 2.  Are there are any other approaches to avoid dropping any late event
> besides ` allowed_lateness`?  In flink you can output those late events as
> side output, wondering if we can do similar thing in Beam as well? Would
> someone provide some code example?
>
> Could someone help us debugging this?  Thanks!
>
> ---
> * Flink's documentation about late event as side output:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>
>
> Sincerely,
> Lydian Lee
>
>