You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Sunny, Mani Kolbe" <Su...@DNB.com> on 2020/06/24 11:22:01 UTC

How to create a marker file when each window completes?

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is all working fine. But to support some legacy downstream services, we need to ensure that output partitions has marker file to indicate that data completeness and is ready for downstream conception. Something like hadoop's .SUCCESS file or a .COMPLETE. Is there way to create such a marker file in Beam on window closing event?

Regards,
Mani

Re: How to create a marker file when each window completes?

Posted by Luke Cwik <lc...@google.com>.
You can use the @OnWindowExpiration with a stateful DoFn that consumes the
PCollection<WriteFilesResult> (you'll need to convert the PCollection to be
a keyed PCollection which you can do that with WithKeys.of(null)) . The
window expiration will only be invoked once all upstream processing for
that window has been completed.

On Wed, Jun 24, 2020 at 9:18 AM Sunny, Mani Kolbe <Su...@dnb.com> wrote:

> Lets say I set *FixedWindows.of*(Duration.standardMinutes(10))
>
> Since my event time is determined by WithTimestamps.of(Instant.now()), I
> can safely assume window is closed once that 10 min period is passed. But
> how do I ensure that all records belonged to that  window are already
> flushed to disk. That I can safely create a .COMPLETE flag?
>
>
>
> *From:* Luke Cwik <lc...@google.com>
> *Sent:* Wednesday, June 24, 2020 5:10 PM
> *To:* user <us...@beam.apache.org>
> *Subject:* Re: How to create a marker file when each window completes?
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Knowing when a window is "closed" is based upon having the watermark
> advance which is based upon even time.
>
>
>
> On Wed, Jun 24, 2020 at 9:05 AM Sunny, Mani Kolbe <Su...@dnb.com> wrote:
>
> Hi Luke,
>
>
>
> Sorry forgot to mention, we override the event timestamp to current using
> WithTimestamps.of(Instant.now()) as we don’t really care actual event time.
> So FixedWindow closes when current time passes window.end time.
>
>
>
> It is a standard practice in oozie world to trigger downstream jobs based
> on marker files. So thought someone might have encountered this before
> problem before me.
>
>
>
> Regards,
>
> Mani
>
>
>
>
>
>
>
> *From:* Luke Cwik <lc...@google.com>
> *Sent:* Wednesday, June 24, 2020 4:23 PM
> *To:* user <us...@beam.apache.org>
> *Subject:* Re: How to create a marker file when each window completes?
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> What do you consider complete? (I ask this since you are using element
> count and processing time triggers)
>
>
>
> Generally the idea is that you can feed the output
> PCollection<WriteFilesResult> to a stateful DoFn with
> an @OnWindowExpiration setup but this works only if you completeness is
> controlled by watermark advancement. Note that @OnWindowExpiration is new
> to Beam so it may not yet work with Spark.
>
>
>
> On Wed, Jun 24, 2020 at 4:22 AM Sunny, Mani Kolbe <Su...@dnb.com> wrote:
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is all working fine. But to support some legacy downstream services,
> we need to ensure that output partitions has marker file to indicate that
> data completeness and is ready for downstream conception. Something like
> hadoop’s .SUCCESS file or a .COMPLETE. Is there way to create such a marker
> file in Beam on window closing event?
>
>
>
> Regards,
>
> Mani
>
>

RE: How to create a marker file when each window completes?

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
Lets say I set FixedWindows.of(Duration.standardMinutes(10))
Since my event time is determined by WithTimestamps.of(Instant.now()), I can safely assume window is closed once that 10 min period is passed. But how do I ensure that all records belonged to that  window are already flushed to disk. That I can safely create a .COMPLETE flag?

From: Luke Cwik <lc...@google.com>
Sent: Wednesday, June 24, 2020 5:10 PM
To: user <us...@beam.apache.org>
Subject: Re: How to create a marker file when each window completes?

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Knowing when a window is "closed" is based upon having the watermark advance which is based upon even time.

On Wed, Jun 24, 2020 at 9:05 AM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
Hi Luke,

Sorry forgot to mention, we override the event timestamp to current using WithTimestamps.of(Instant.now()) as we don’t really care actual event time. So FixedWindow closes when current time passes window.end time.

It is a standard practice in oozie world to trigger downstream jobs based on marker files. So thought someone might have encountered this before problem before me.

Regards,
Mani



From: Luke Cwik <lc...@google.com>>
Sent: Wednesday, June 24, 2020 4:23 PM
To: user <us...@beam.apache.org>>
Subject: Re: How to create a marker file when each window completes?

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

What do you consider complete? (I ask this since you are using element count and processing time triggers)

Generally the idea is that you can feed the output PCollection<WriteFilesResult> to a stateful DoFn with an @OnWindowExpiration setup but this works only if you completeness is controlled by watermark advancement. Note that @OnWindowExpiration is new to Beam so it may not yet work with Spark.

On Wed, Jun 24, 2020 at 4:22 AM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is all working fine. But to support some legacy downstream services, we need to ensure that output partitions has marker file to indicate that data completeness and is ready for downstream conception. Something like hadoop’s .SUCCESS file or a .COMPLETE. Is there way to create such a marker file in Beam on window closing event?

Regards,
Mani

Re: How to create a marker file when each window completes?

Posted by Luke Cwik <lc...@google.com>.
Knowing when a window is "closed" is based upon having the watermark
advance which is based upon even time.

On Wed, Jun 24, 2020 at 9:05 AM Sunny, Mani Kolbe <Su...@dnb.com> wrote:

> Hi Luke,
>
>
>
> Sorry forgot to mention, we override the event timestamp to current using
> WithTimestamps.of(Instant.now()) as we don’t really care actual event time.
> So FixedWindow closes when current time passes window.end time.
>
>
>
> It is a standard practice in oozie world to trigger downstream jobs based
> on marker files. So thought someone might have encountered this before
> problem before me.
>
>
>
> Regards,
>
> Mani
>
>
>
>
>
>
>
> *From:* Luke Cwik <lc...@google.com>
> *Sent:* Wednesday, June 24, 2020 4:23 PM
> *To:* user <us...@beam.apache.org>
> *Subject:* Re: How to create a marker file when each window completes?
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> What do you consider complete? (I ask this since you are using element
> count and processing time triggers)
>
>
>
> Generally the idea is that you can feed the output
> PCollection<WriteFilesResult> to a stateful DoFn with
> an @OnWindowExpiration setup but this works only if you completeness is
> controlled by watermark advancement. Note that @OnWindowExpiration is new
> to Beam so it may not yet work with Spark.
>
>
>
> On Wed, Jun 24, 2020 at 4:22 AM Sunny, Mani Kolbe <Su...@dnb.com> wrote:
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is all working fine. But to support some legacy downstream services,
> we need to ensure that output partitions has marker file to indicate that
> data completeness and is ready for downstream conception. Something like
> hadoop’s .SUCCESS file or a .COMPLETE. Is there way to create such a marker
> file in Beam on window closing event?
>
>
>
> Regards,
>
> Mani
>
>

RE: How to create a marker file when each window completes?

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
Hi Luke,

Sorry forgot to mention, we override the event timestamp to current using WithTimestamps.of(Instant.now()) as we don’t really care actual event time. So FixedWindow closes when current time passes window.end time.

It is a standard practice in oozie world to trigger downstream jobs based on marker files. So thought someone might have encountered this before problem before me.

Regards,
Mani



From: Luke Cwik <lc...@google.com>
Sent: Wednesday, June 24, 2020 4:23 PM
To: user <us...@beam.apache.org>
Subject: Re: How to create a marker file when each window completes?

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

What do you consider complete? (I ask this since you are using element count and processing time triggers)

Generally the idea is that you can feed the output PCollection<WriteFilesResult> to a stateful DoFn with an @OnWindowExpiration setup but this works only if you completeness is controlled by watermark advancement. Note that @OnWindowExpiration is new to Beam so it may not yet work with Spark.

On Wed, Jun 24, 2020 at 4:22 AM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is all working fine. But to support some legacy downstream services, we need to ensure that output partitions has marker file to indicate that data completeness and is ready for downstream conception. Something like hadoop’s .SUCCESS file or a .COMPLETE. Is there way to create such a marker file in Beam on window closing event?

Regards,
Mani

Re: How to create a marker file when each window completes?

Posted by Luke Cwik <lc...@google.com>.
What do you consider complete? (I ask this since you are using element
count and processing time triggers)

Generally the idea is that you can feed the output
PCollection<WriteFilesResult> to a stateful DoFn with
an @OnWindowExpiration setup but this works only if you completeness is
controlled by watermark advancement. Note that @OnWindowExpiration is new
to Beam so it may not yet work with Spark.

On Wed, Jun 24, 2020 at 4:22 AM Sunny, Mani Kolbe <Su...@dnb.com> wrote:

> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is all working fine. But to support some legacy downstream services,
> we need to ensure that output partitions has marker file to indicate that
> data completeness and is ready for downstream conception. Something like
> hadoop’s .SUCCESS file or a .COMPLETE. Is there way to create such a marker
> file in Beam on window closing event?
>
>
>
> Regards,
>
> Mani
>