You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "Koprivica,Preston Blake" <Pr...@cerner.com> on 2019/10/23 18:35:55 UTC

FileIO and windowed writes

Hi guys,

I’m currently working on a simple system where the intention is to ingest data from a realtime stream – in this case amazon SQS – and write the output in an incremental fashion to a durable filesystem (ie S3).  It’s easy to think of this as a low-fi journaling system.  We need to make sure that data that’s written to the source queue eventually makes it to S3.  We are utilizing the FileIO windowed writes with a custom naming policy to partition the files by their event time.   Because SQS can’t guarantee order, we do have to allow late messages.  Moreover, we need a further guarantee that a message be written in a timely manner – we’re thinking some constant multiple of the windowing duration.  As a first pass, we were thinking a processing time based trigger that fires on some regular interval.  For context, here’s an example of the pipeline:

ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write to Avro

  pipeline
        .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
        .apply(
            Window.<Message>configure()
                .discardingFiredPanes()
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
        .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
        .setCoder(AvroCoder.of(recordClass))
        .apply(
            AvroIO.write(recordClass)
                .withWindowedWrites()
                .withTempDirectory(options.getTempDir())
                .withNumShards(options.getShards())
                .to(new WindowedFilenamePolicy(options.getOutputPrefix(), "avro")));

This all seemed fairly straightforward.  I have not yet observed lost data with this pipeline, but I am seeing an issue with timeliness.  Things seem to get hung up on finalizing file output, but I have yet to truly pinpoint the issue.  To really highlight the issue, I can setup a test where I send a single message to the source queue.  If nothing else happens, the data never makes it to its final output using the FlinkRunner (beam-2.15.0, flink-1.8).  Has anyone seen this behavior before?  Is the expectation of eventual consistency wrong?

Thanks,
-Preston




CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.

Re: FileIO and windowed writes

Posted by "Koprivica,Preston Blake" <Pr...@cerner.com>.
I currently only have quick access to test the DirectRunner and the FlinkRunner.  This only manifests in the FlinkRunner.

From: Reuven Lax <re...@google.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Wednesday, October 23, 2019 at 4:14 PM
To: dev <de...@beam.apache.org>
Subject: Re: FileIO and windowed writes

Is this only in the Flink runner?

On Wed, Oct 23, 2019 at 2:12 PM Koprivica,Preston Blake <Pr...@cerner.com>> wrote:
I’ve tried different windowing functions and all result in the same behavior.  The one in the previous email used the global window and a processing time based repeated trigger.  The filename policy used real system time to timestamp the outgoing files.

Here are a couple other window+trigger combos I’ve tried (with window based filename strategies):

Window.<Message>into(FixedWindows.of(windowDur))
                .withAllowedLateness(Duration.standardHours(24))
                .discardingFiredPanes()
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
ie a fixed window with processing time based trigger + ~infinite lateness


    pipeline

        .apply(

            format("ReadSQS(%s)", options.getQueueUrl()),

            SqsIO.read().withQueueUrl(options.getQueueUrl()))

        .apply(WithTimestamps.of((Message m) -> Instant.now()))

        .apply(

            format("Window(%s)", options.getWindowDuration()),

            Window.into(FixedWindows.of(windowDur)))
ie map event time to processing time and use default trigger (ie close of window, no lateness)

These all resulted in the same behavior – data gets hung up in a temp file somewhere and the finalize file logic never seems to run.

Thanks,
-Preston

From: Reuven Lax <re...@google.com>>
Reply-To: "dev@beam.apache.org<ma...@beam.apache.org>" <de...@beam.apache.org>>
Date: Wednesday, October 23, 2019 at 2:35 PM
To: dev <de...@beam.apache.org>>
Subject: Re: FileIO and windowed writes

What WindowFn are you using?

On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake <Pr...@cerner.com>> wrote:
Hi guys,

I’m currently working on a simple system where the intention is to ingest data from a realtime stream – in this case amazon SQS – and write the output in an incremental fashion to a durable filesystem (ie S3).  It’s easy to think of this as a low-fi journaling system.  We need to make sure that data that’s written to the source queue eventually makes it to S3.  We are utilizing the FileIO windowed writes with a custom naming policy to partition the files by their event time.   Because SQS can’t guarantee order, we do have to allow late messages.  Moreover, we need a further guarantee that a message be written in a timely manner – we’re thinking some constant multiple of the windowing duration.  As a first pass, we were thinking a processing time based trigger that fires on some regular interval.  For context, here’s an example of the pipeline:

ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write to Avro

  pipeline
        .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
        .apply(
            Window.<Message>configure()
                .discardingFiredPanes()
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
        .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
        .setCoder(AvroCoder.of(recordClass))
        .apply(
            AvroIO.write(recordClass)
                .withWindowedWrites()
                .withTempDirectory(options.getTempDir())
                .withNumShards(options.getShards())
                .to(new WindowedFilenamePolicy(options.getOutputPrefix(), "avro")));

This all seemed fairly straightforward.  I have not yet observed lost data with this pipeline, but I am seeing an issue with timeliness.  Things seem to get hung up on finalizing file output, but I have yet to truly pinpoint the issue.  To really highlight the issue, I can setup a test where I send a single message to the source queue.  If nothing else happens, the data never makes it to its final output using the FlinkRunner (beam-2.15.0, flink-1.8).  Has anyone seen this behavior before?  Is the expectation of eventual consistency wrong?

Thanks,
-Preston




CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024<tel:(816)%20221-1024>.

Re: FileIO and windowed writes

Posted by Reuven Lax <re...@google.com>.
Is this only in the Flink runner?

On Wed, Oct 23, 2019 at 2:12 PM Koprivica,Preston Blake <
Preston.B.Koprivica@cerner.com> wrote:

> I’ve tried different windowing functions and all result in the same
> behavior.  The one in the previous email used the global window and a
> processing time based repeated trigger.  The filename policy used real
> system time to timestamp the outgoing files.
>
>
>
> Here are a couple other window+trigger combos I’ve tried (with window
> based filename strategies):
>
>
>
> Window.<Message>into(FixedWindows.of(windowDur))
>
>                 .withAllowedLateness(Duration.standardHours(24))
>
>                 .discardingFiredPanes()
>
>                 .triggering(
>
>                     Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
>
> ie a fixed window with processing time based trigger + ~infinite lateness
>
>
>
>     pipeline
>
>         .apply(
>
>             format("ReadSQS(%s)", options.getQueueUrl()),
>
>             SqsIO.read().withQueueUrl(options.getQueueUrl()))
>
>         .apply(WithTimestamps.of((Message m) -> Instant.now()))
>
>         .apply(
>
>             format("Window(%s)", options.getWindowDuration()),
>
>             Window.into(FixedWindows.of(windowDur)))
>
> ie map event time to processing time and use default trigger (ie close of
> window, no lateness)
>
>
>
> These all resulted in the same behavior – data gets hung up in a temp file
> somewhere and the finalize file logic never seems to run.
>
>
>
> Thanks,
>
> -Preston
>
>
>
> *From: *Reuven Lax <re...@google.com>
> *Reply-To: *"dev@beam.apache.org" <de...@beam.apache.org>
> *Date: *Wednesday, October 23, 2019 at 2:35 PM
> *To: *dev <de...@beam.apache.org>
> *Subject: *Re: FileIO and windowed writes
>
>
>
> What WindowFn are you using?
>
>
>
> On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake <
> Preston.B.Koprivica@cerner.com> wrote:
>
> Hi guys,
>
>
>
> I’m currently working on a simple system where the intention is to ingest
> data from a realtime stream – in this case amazon SQS – and write the
> output in an incremental fashion to a durable filesystem (ie S3).  It’s
> easy to think of this as a low-fi journaling system.  We need to make sure
> that data that’s written to the source queue eventually makes it to S3.  We
> are utilizing the FileIO windowed writes with a custom naming policy to
> partition the files by their event time.   Because SQS can’t guarantee
> order, we do have to allow late messages.  Moreover, we need a further
> guarantee that a message be written in a timely manner – we’re thinking
> some constant multiple of the windowing duration.  As a first pass, we were
> thinking a processing time based trigger that fires on some regular
> interval.  For context, here’s an example of the pipeline:
>
>
>
> ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message ->
> Write to Avro
>
>
>
>   pipeline
>
>         .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
>
>         .apply(
>
>             Window.<Message>configure()
>
>                 .discardingFiredPanes()
>
>                 .triggering(
>
>                     Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
>
>         .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
>
>         .setCoder(AvroCoder.of(recordClass))
>
>         .apply(
>
>             AvroIO.write(recordClass)
>
>                 .withWindowedWrites()
>
>                 .withTempDirectory(options.getTempDir())
>
>                 .withNumShards(options.getShards())
>
>                 .to(new WindowedFilenamePolicy(options.getOutputPrefix(),
> "avro")));
>
>
>
> This all seemed fairly straightforward.  I have not yet observed lost data
> with this pipeline, but I am seeing an issue with timeliness.  Things seem
> to get hung up on finalizing file output, but I have yet to truly pinpoint
> the issue.  To really highlight the issue, I can setup a test where I send
> a single message to the source queue.  If nothing else happens, the data
> never makes it to its final output using the FlinkRunner (beam-2.15.0,
> flink-1.8).  Has anyone seen this behavior before?  Is the expectation of
> eventual consistency wrong?
>
>
>
> Thanks,
>
> -Preston
>
>
>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>.
>
>

Re: FileIO and windowed writes

Posted by "Koprivica,Preston Blake" <Pr...@cerner.com>.
I’ve tried different windowing functions and all result in the same behavior.  The one in the previous email used the global window and a processing time based repeated trigger.  The filename policy used real system time to timestamp the outgoing files.

Here are a couple other window+trigger combos I’ve tried (with window based filename strategies):

Window.<Message>into(FixedWindows.of(windowDur))
                .withAllowedLateness(Duration.standardHours(24))
                .discardingFiredPanes()
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
ie a fixed window with processing time based trigger + ~infinite lateness


    pipeline

        .apply(

            format("ReadSQS(%s)", options.getQueueUrl()),

            SqsIO.read().withQueueUrl(options.getQueueUrl()))

        .apply(WithTimestamps.of((Message m) -> Instant.now()))

        .apply(

            format("Window(%s)", options.getWindowDuration()),

            Window.into(FixedWindows.of(windowDur)))
ie map event time to processing time and use default trigger (ie close of window, no lateness)

These all resulted in the same behavior – data gets hung up in a temp file somewhere and the finalize file logic never seems to run.

Thanks,
-Preston

From: Reuven Lax <re...@google.com>
Reply-To: "dev@beam.apache.org" <de...@beam.apache.org>
Date: Wednesday, October 23, 2019 at 2:35 PM
To: dev <de...@beam.apache.org>
Subject: Re: FileIO and windowed writes

What WindowFn are you using?

On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake <Pr...@cerner.com>> wrote:
Hi guys,

I’m currently working on a simple system where the intention is to ingest data from a realtime stream – in this case amazon SQS – and write the output in an incremental fashion to a durable filesystem (ie S3).  It’s easy to think of this as a low-fi journaling system.  We need to make sure that data that’s written to the source queue eventually makes it to S3.  We are utilizing the FileIO windowed writes with a custom naming policy to partition the files by their event time.   Because SQS can’t guarantee order, we do have to allow late messages.  Moreover, we need a further guarantee that a message be written in a timely manner – we’re thinking some constant multiple of the windowing duration.  As a first pass, we were thinking a processing time based trigger that fires on some regular interval.  For context, here’s an example of the pipeline:

ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write to Avro

  pipeline
        .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
        .apply(
            Window.<Message>configure()
                .discardingFiredPanes()
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
        .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
        .setCoder(AvroCoder.of(recordClass))
        .apply(
            AvroIO.write(recordClass)
                .withWindowedWrites()
                .withTempDirectory(options.getTempDir())
                .withNumShards(options.getShards())
                .to(new WindowedFilenamePolicy(options.getOutputPrefix(), "avro")));

This all seemed fairly straightforward.  I have not yet observed lost data with this pipeline, but I am seeing an issue with timeliness.  Things seem to get hung up on finalizing file output, but I have yet to truly pinpoint the issue.  To really highlight the issue, I can setup a test where I send a single message to the source queue.  If nothing else happens, the data never makes it to its final output using the FlinkRunner (beam-2.15.0, flink-1.8).  Has anyone seen this behavior before?  Is the expectation of eventual consistency wrong?

Thanks,
-Preston




CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024<tel:(816)%20221-1024>.

Re: FileIO and windowed writes

Posted by Reuven Lax <re...@google.com>.
What WindowFn are you using?

On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake <
Preston.B.Koprivica@cerner.com> wrote:

> Hi guys,
>
>
>
> I’m currently working on a simple system where the intention is to ingest
> data from a realtime stream – in this case amazon SQS – and write the
> output in an incremental fashion to a durable filesystem (ie S3).  It’s
> easy to think of this as a low-fi journaling system.  We need to make sure
> that data that’s written to the source queue eventually makes it to S3.  We
> are utilizing the FileIO windowed writes with a custom naming policy to
> partition the files by their event time.   Because SQS can’t guarantee
> order, we do have to allow late messages.  Moreover, we need a further
> guarantee that a message be written in a timely manner – we’re thinking
> some constant multiple of the windowing duration.  As a first pass, we were
> thinking a processing time based trigger that fires on some regular
> interval.  For context, here’s an example of the pipeline:
>
>
>
> ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message ->
> Write to Avro
>
>
>
>   pipeline
>
>         .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
>
>         .apply(
>
>             Window.<Message>configure()
>
>                 .discardingFiredPanes()
>
>                 .triggering(
>
>                     Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
>
>         .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
>
>         .setCoder(AvroCoder.of(recordClass))
>
>         .apply(
>
>             AvroIO.write(recordClass)
>
>                 .withWindowedWrites()
>
>                 .withTempDirectory(options.getTempDir())
>
>                 .withNumShards(options.getShards())
>
>                 .to(new WindowedFilenamePolicy(options.getOutputPrefix(),
> "avro")));
>
>
>
> This all seemed fairly straightforward.  I have not yet observed lost data
> with this pipeline, but I am seeing an issue with timeliness.  Things seem
> to get hung up on finalizing file output, but I have yet to truly pinpoint
> the issue.  To really highlight the issue, I can setup a test where I send
> a single message to the source queue.  If nothing else happens, the data
> never makes it to its final output using the FlinkRunner (beam-2.15.0,
> flink-1.8).  Has anyone seen this behavior before?  Is the expectation of
> eventual consistency wrong?
>
>
>
> Thanks,
>
> -Preston
>
>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>.
>