You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by peay <pe...@protonmail.com> on 2017/04/22 18:15:38 UTC

Using watermarks with bounded sources

Hello,

A use case I find myself running into frequently is the following: I have daily or hourly files, and a Beam pipeline with a small to moderate size windows. (Actually, I've just seen that support for per-window files support in file based sinks was recently checked in, which is one way to get there).

Now, Beam has no clue about the fact that each file corresponds to a given time interval. My understanding is that when running the pipeline in batch mode with a bounded source, there is no notion watermark and we have to load everything because we just don't know. This is pretty wasteful, especially as you have to keep a lot of data in memory, while you could in principle operate close to what you'd do in streaming mode: first read the oldest files, then newest files, moving the watermark forward as you go through the input list of files.

I see one way around this. Let's say that I have hourly files and let's not assume anything about the order of records within the file to keep it simple: I don't want a very precise record-level watermark, but more a rough watermark at the granularity of hours. Say we can easily get the corresponding time interval from the filename. One can make an unbounded source that essentially acts as a "List of bounded file-based sources". If there are K splits, split k can read every file that has `index % K == k` in the time-ordered list of files. `advance` can advance the current file, and move on to the next one if no records were read.

However, as far as I understand, this pipeline will never terminate since this is an unbounded source and having the `advance` method of our wrapping source return `false` won't make the pipeline terminate. Can someone confirm if this is correct? If yes, what would be ways to work around that? There's always the option to throw to make the pipeline fail, but this is far from ideal.

Thanks,

Re: Using watermarks with bounded sources

Posted by Eugene Kirpichov <ki...@google.com>.
Shading issues have been fixed. I believe it should be all good to use now.

On Wed, Jun 21, 2017 at 2:24 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Yes, there's a pending PR https://github.com/apache/beam/pull/3360
>
> Note that there are some shading issues with the Dataflow streaming runner
> support. It passes ValidatesRunner tests, but will likely fail in a real
> job :-| I'm working on resolving this with +Kenn Knowles <kl...@google.com>
> right now.
>
> On Wed, Jun 21, 2017 at 2:18 PM peay <pe...@protonmail.com> wrote:
>
>> Thanks, great news! Are there plans to get ProcessContinuation from the
>> original proposal into the API?
>>
>> -------- Original Message --------
>> Subject: Re: Using watermarks with bounded sources
>> Local Time: June 20, 2017 7:52 PM
>> UTC Time: June 20, 2017 11:52 PM
>> From: kirpichov@google.com
>>
>> To: peay <pe...@protonmail.com>, klk@google.com <kl...@google.com>
>> user@beam.apache.org <us...@beam.apache.org>
>>
>> Hi!
>>
>> The PR just got submitted. You can play with SDF in Dataflow streaming
>> runner now :) Hope it doesn't get rolled back (fingers crossed)...
>>
>> On Mon, Jun 19, 2017 at 6:06 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Hi,
>>> The PR is ready and I'm just struggling with setup of tests - Dataflow
>>> ValidatesRunner tests currently don't have a streaming execution.
>>> I think +Kenn Knowles <kl...@google.com> was doing something about that,
>>> or I might find a workaround.
>>>
>>> But basically if you want to experiment - if you patch in the PR, you
>>> can experiment with SDF in Dataflow in streaming mode. It passes tests
>>> against the current production Dataflow Service.
>>>
>>>
>>> On Thu, Jun 15, 2017 at 8:54 AM peay <pe...@protonmail.com> wrote:
>>>
>>>> Eugene, would you have an ETA on when splittable DoFn would be
>>>> available in Dataflow in batch/streaming mode? I see that
>>>> https://github.com/apache/beam/pull/1898 is still active
>>>>
>>>> I've started to experiment with those using the DirectRunner and this
>>>> is a great API.
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> -------- Original Message --------
>>>> Subject: Re: Using watermarks with bounded sources
>>>>
>>>> Local Time: April 23, 2017 10:18 AM
>>>> UTC Time: April 23, 2017 2:18 PM
>>>> From: peay@protonmail.com
>>>> To: Eugene Kirpichov <ki...@google.com>
>>>> user@beam.apache.org <us...@beam.apache.org>
>>>>
>>>> Ah, I didn't know about that. This is *really* great -- from a quick
>>>> look, the API looks both very natural and very powerful. Thanks a lot for
>>>> getting this into Beam!
>>>>
>>>> I see Flink support seems to have been merged already. Any idea on when
>>>> https://github.com/apache/beam/pull/1898 will get merged?
>>>>
>>>> I see updateWatermark in the API but not in the proposal's examples
>>>> which only uses resume/withFutureOutputWatermark.  Any reason
>>>> why updateWatermark is not called after each output in the examples from
>>>> the proposal? I guess that would be "too fined-grained" to update it for
>>>> each individual record of a mini-batch?
>>>>
>>>> In my case with existing hourly files, would `outputElement(01:00
>>>> file), updateWatermark(01:00), outputElement(02:00),
>>>> updateWatermark(02:00), ...`  be the proper way to output per-hour elements
>>>> while gradually moving the watermark forward while going through an
>>>> existing list? Or would you instead suggest to still use resume
>>>> (potentially with were small timeouts)?
>>>>
>>>> Thanks,
>>>>
>>>> -------- Original Message --------
>>>> Subject: Re: Using watermarks with bounded sources
>>>> Local Time: 22 April 2017 3:59 PM
>>>> UTC Time: 22 April 2017 19:59
>>>> From: kirpichov@google.com
>>>> To: peay <pe...@protonmail.com>, user@beam.apache.org <
>>>> user@beam.apache.org>
>>>>
>>>> Hi! This is an excellent question; don't have time to reply in much
>>>> detail right now, but please take a look at
>>>> http://s.apache.org/splittable-do-fn - it unifies the concepts of
>>>> bounded and unbounded sources, and the use case you mentioned is one of the
>>>> motivating examples.
>>>>
>>>> Also, see recent discussions on pipeline termination semantics:
>>>> technically nothing should prevent an unbounded source from saying it's
>>>> done "for real" (no new data will appear), just the current UnboundedSource
>>>> API does not expose such a method. (but Splittable DoFn does)
>>>>
>>>> On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> A use case I find myself running into frequently is the following: I
>>>>> have daily or hourly files, and a Beam pipeline with a small to moderate
>>>>> size windows. (Actually, I've just seen that support for per-window files
>>>>> support in file based sinks was recently checked in, which is one way to
>>>>> get there).
>>>>>
>>>>> Now, Beam has no clue about the fact that each file corresponds to a
>>>>> given time interval. My understanding is that when running the pipeline in
>>>>> batch mode with a bounded source, there is no notion watermark and we have
>>>>> to load everything because we just don't know. This is pretty wasteful,
>>>>> especially as you have to keep a lot of data in memory, while you could in
>>>>> principle operate close to what you'd do in streaming mode: first read the
>>>>> oldest files, then newest files, moving the watermark forward as you go
>>>>> through the input list of files.
>>>>>
>>>>> I see one way around this. Let's say that I have hourly files and
>>>>> let's not assume anything about the order of records within the file to
>>>>> keep it simple: I don't want a very precise record-level watermark, but
>>>>> more a rough watermark at the granularity of hours. Say we can easily get
>>>>> the corresponding time interval from the filename. One can make an
>>>>> unbounded source that essentially acts as a "List of bounded file-based
>>>>> sources". If there are K splits, split k can read every file that has
>>>>> `index % K == k` in the time-ordered list of files. `advance` can advance
>>>>> the current file, and move on to the next one if no records were read.
>>>>>
>>>>> However, as far as I understand, this pipeline will never terminate
>>>>> since this is an unbounded source and having the `advance` method of our
>>>>> wrapping source return `false` won't make the pipeline terminate. Can
>>>>> someone confirm if this is correct? If yes, what would be ways to work
>>>>> around that? There's always the option to throw to make the pipeline fail,
>>>>> but this is far from ideal.
>>>>>
>>>>> Thanks,
>>>>>
>>>>
>>>>

Re: Using watermarks with bounded sources

Posted by Eugene Kirpichov <ki...@google.com>.
Yes, there's a pending PR https://github.com/apache/beam/pull/3360

Note that there are some shading issues with the Dataflow streaming runner
support. It passes ValidatesRunner tests, but will likely fail in a real
job :-| I'm working on resolving this with +Kenn Knowles <kl...@google.com>
right now.

On Wed, Jun 21, 2017 at 2:18 PM peay <pe...@protonmail.com> wrote:

> Thanks, great news! Are there plans to get ProcessContinuation from the
> original proposal into the API?
>
> -------- Original Message --------
> Subject: Re: Using watermarks with bounded sources
> Local Time: June 20, 2017 7:52 PM
> UTC Time: June 20, 2017 11:52 PM
> From: kirpichov@google.com
>
> To: peay <pe...@protonmail.com>, klk@google.com <kl...@google.com>
> user@beam.apache.org <us...@beam.apache.org>
>
> Hi!
>
> The PR just got submitted. You can play with SDF in Dataflow streaming
> runner now :) Hope it doesn't get rolled back (fingers crossed)...
>
> On Mon, Jun 19, 2017 at 6:06 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hi,
>> The PR is ready and I'm just struggling with setup of tests - Dataflow
>> ValidatesRunner tests currently don't have a streaming execution.
>> I think +Kenn Knowles <kl...@google.com> was doing something about that,
>> or I might find a workaround.
>>
>> But basically if you want to experiment - if you patch in the PR, you can
>> experiment with SDF in Dataflow in streaming mode. It passes tests against
>> the current production Dataflow Service.
>>
>>
>> On Thu, Jun 15, 2017 at 8:54 AM peay <pe...@protonmail.com> wrote:
>>
>>> Eugene, would you have an ETA on when splittable DoFn would be available
>>> in Dataflow in batch/streaming mode? I see that
>>> https://github.com/apache/beam/pull/1898 is still active
>>>
>>> I've started to experiment with those using the DirectRunner and this is
>>> a great API.
>>>
>>> Thanks!
>>>
>>>
>>> -------- Original Message --------
>>> Subject: Re: Using watermarks with bounded sources
>>>
>>> Local Time: April 23, 2017 10:18 AM
>>> UTC Time: April 23, 2017 2:18 PM
>>> From: peay@protonmail.com
>>> To: Eugene Kirpichov <ki...@google.com>
>>> user@beam.apache.org <us...@beam.apache.org>
>>>
>>> Ah, I didn't know about that. This is *really* great -- from a quick
>>> look, the API looks both very natural and very powerful. Thanks a lot for
>>> getting this into Beam!
>>>
>>> I see Flink support seems to have been merged already. Any idea on when
>>> https://github.com/apache/beam/pull/1898 will get merged?
>>>
>>> I see updateWatermark in the API but not in the proposal's examples
>>> which only uses resume/withFutureOutputWatermark.  Any reason
>>> why updateWatermark is not called after each output in the examples from
>>> the proposal? I guess that would be "too fined-grained" to update it for
>>> each individual record of a mini-batch?
>>>
>>> In my case with existing hourly files, would `outputElement(01:00 file),
>>> updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...`
>>>  be the proper way to output per-hour elements while gradually moving the
>>> watermark forward while going through an existing list? Or would you
>>> instead suggest to still use resume (potentially with were small timeouts)?
>>>
>>> Thanks,
>>>
>>> -------- Original Message --------
>>> Subject: Re: Using watermarks with bounded sources
>>> Local Time: 22 April 2017 3:59 PM
>>> UTC Time: 22 April 2017 19:59
>>> From: kirpichov@google.com
>>> To: peay <pe...@protonmail.com>, user@beam.apache.org <
>>> user@beam.apache.org>
>>>
>>> Hi! This is an excellent question; don't have time to reply in much
>>> detail right now, but please take a look at
>>> http://s.apache.org/splittable-do-fn - it unifies the concepts of
>>> bounded and unbounded sources, and the use case you mentioned is one of the
>>> motivating examples.
>>>
>>> Also, see recent discussions on pipeline termination semantics:
>>> technically nothing should prevent an unbounded source from saying it's
>>> done "for real" (no new data will appear), just the current UnboundedSource
>>> API does not expose such a method. (but Splittable DoFn does)
>>>
>>> On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> A use case I find myself running into frequently is the following: I
>>>> have daily or hourly files, and a Beam pipeline with a small to moderate
>>>> size windows. (Actually, I've just seen that support for per-window files
>>>> support in file based sinks was recently checked in, which is one way to
>>>> get there).
>>>>
>>>> Now, Beam has no clue about the fact that each file corresponds to a
>>>> given time interval. My understanding is that when running the pipeline in
>>>> batch mode with a bounded source, there is no notion watermark and we have
>>>> to load everything because we just don't know. This is pretty wasteful,
>>>> especially as you have to keep a lot of data in memory, while you could in
>>>> principle operate close to what you'd do in streaming mode: first read the
>>>> oldest files, then newest files, moving the watermark forward as you go
>>>> through the input list of files.
>>>>
>>>> I see one way around this. Let's say that I have hourly files and let's
>>>> not assume anything about the order of records within the file to keep it
>>>> simple: I don't want a very precise record-level watermark, but more a
>>>> rough watermark at the granularity of hours. Say we can easily get the
>>>> corresponding time interval from the filename. One can make an unbounded
>>>> source that essentially acts as a "List of bounded file-based sources". If
>>>> there are K splits, split k can read every file that has `index % K == k`
>>>> in the time-ordered list of files. `advance` can advance the current file,
>>>> and move on to the next one if no records were read.
>>>>
>>>> However, as far as I understand, this pipeline will never terminate
>>>> since this is an unbounded source and having the `advance` method of our
>>>> wrapping source return `false` won't make the pipeline terminate. Can
>>>> someone confirm if this is correct? If yes, what would be ways to work
>>>> around that? There's always the option to throw to make the pipeline fail,
>>>> but this is far from ideal.
>>>>
>>>> Thanks,
>>>>
>>>
>>>

Re: Using watermarks with bounded sources

Posted by peay <pe...@protonmail.com>.
Thanks, great news! Are there plans to get ProcessContinuation from the original proposal into the API?

-------- Original Message --------
Subject: Re: Using watermarks with bounded sources
Local Time: June 20, 2017 7:52 PM
UTC Time: June 20, 2017 11:52 PM
From: kirpichov@google.com
To: peay <pe...@protonmail.com>, klk@google.com <kl...@google.com>
user@beam.apache.org <us...@beam.apache.org>

Hi!

The PR just got submitted. You can play with SDF in Dataflow streaming runner now :) Hope it doesn't get rolled back (fingers crossed)...

On Mon, Jun 19, 2017 at 6:06 PM Eugene Kirpichov <ki...@google.com> wrote:

Hi,
The PR is ready and I'm just struggling with setup of tests - Dataflow ValidatesRunner tests currently don't have a streaming execution.
I think [+Kenn Knowles](mailto:klk@google.com) was doing something about that, or I might find a workaround.

But basically if you want to experiment - if you patch in the PR, you can experiment with SDF in Dataflow in streaming mode. It passes tests against the current production Dataflow Service.

On Thu, Jun 15, 2017 at 8:54 AM peay <pe...@protonmail.com> wrote:
Eugene, would you have an ETA on when splittable DoFn would be available in Dataflow in batch/streaming mode? I see that https://github.com/apache/beam/pull/1898 is still active

I've started to experiment with those using the DirectRunner and this is a great API.

Thanks!

-------- Original Message --------
Subject: Re: Using watermarks with bounded sources
Local Time: April 23, 2017 10:18 AM
UTC Time: April 23, 2017 2:18 PM
From: peay@protonmail.com
To: Eugene Kirpichov <ki...@google.com>
user@beam.apache.org <us...@beam.apache.org>

Ah, I didn't know about that. This is *really* great -- from a quick look, the API looks both very natural and very powerful. Thanks a lot for getting this into Beam!

I see Flink support seems to have been merged already. Any idea on when https://github.com/apache/beam/pull/1898 will get merged?

I see updateWatermark in the API but not in the proposal's examples which only uses resume/withFutureOutputWatermark. Any reason why updateWatermark is not called after each output in the examples from the proposal? I guess that would be "too fined-grained" to update it for each individual record of a mini-batch?

In my case with existing hourly files, would `outputElement(01:00 file), updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...` be the proper way to output per-hour elements while gradually moving the watermark forward while going through an existing list? Or would you instead suggest to still use resume (potentially with were small timeouts)?

Thanks,

-------- Original Message --------
Subject: Re: Using watermarks with bounded sources
Local Time: 22 April 2017 3:59 PM
UTC Time: 22 April 2017 19:59
From: kirpichov@google.com
To: peay <pe...@protonmail.com>, user@beam.apache.org <us...@beam.apache.org>

Hi! This is an excellent question; don't have time to reply in much detail right now, but please take a look at http://s.apache.org/splittable-do-fn - it unifies the concepts of bounded and unbounded sources, and the use case you mentioned is one of the motivating examples.

Also, see recent discussions on pipeline termination semantics: technically nothing should prevent an unbounded source from saying it's done "for real" (no new data will appear), just the current UnboundedSource API does not expose such a method. (but Splittable DoFn does)

On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:
Hello,

A use case I find myself running into frequently is the following: I have daily or hourly files, and a Beam pipeline with a small to moderate size windows. (Actually, I've just seen that support for per-window files support in file based sinks was recently checked in, which is one way to get there).

Now, Beam has no clue about the fact that each file corresponds to a given time interval. My understanding is that when running the pipeline in batch mode with a bounded source, there is no notion watermark and we have to load everything because we just don't know. This is pretty wasteful, especially as you have to keep a lot of data in memory, while you could in principle operate close to what you'd do in streaming mode: first read the oldest files, then newest files, moving the watermark forward as you go through the input list of files.

I see one way around this. Let's say that I have hourly files and let's not assume anything about the order of records within the file to keep it simple: I don't want a very precise record-level watermark, but more a rough watermark at the granularity of hours. Say we can easily get the corresponding time interval from the filename. One can make an unbounded source that essentially acts as a "List of bounded file-based sources". If there are K splits, split k can read every file that has `index % K == k` in the time-ordered list of files. `advance` can advance the current file, and move on to the next one if no records were read.

However, as far as I understand, this pipeline will never terminate since this is an unbounded source and having the `advance` method of our wrapping source return `false` won't make the pipeline terminate. Can someone confirm if this is correct? If yes, what would be ways to work around that? There's always the option to throw to make the pipeline fail, but this is far from ideal.

Thanks,

Re: Using watermarks with bounded sources

Posted by Eugene Kirpichov <ki...@google.com>.
Hi!

The PR just got submitted. You can play with SDF in Dataflow streaming
runner now :) Hope it doesn't get rolled back (fingers crossed)...

On Mon, Jun 19, 2017 at 6:06 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Hi,
> The PR is ready and I'm just struggling with setup of tests - Dataflow
> ValidatesRunner tests currently don't have a streaming execution.
> I think +Kenn Knowles <kl...@google.com> was doing something about that, or
> I might find a workaround.
>
> But basically if you want to experiment - if you patch in the PR, you can
> experiment with SDF in Dataflow in streaming mode. It passes tests against
> the current production Dataflow Service.
>
>
> On Thu, Jun 15, 2017 at 8:54 AM peay <pe...@protonmail.com> wrote:
>
>> Eugene, would you have an ETA on when splittable DoFn would be available
>> in Dataflow in batch/streaming mode? I see that
>> https://github.com/apache/beam/pull/1898 is still active
>>
>> I've started to experiment with those using the DirectRunner and this is
>> a great API.
>>
>> Thanks!
>>
>> -------- Original Message --------
>> Subject: Re: Using watermarks with bounded sources
>>
>> Local Time: April 23, 2017 10:18 AM
>> UTC Time: April 23, 2017 2:18 PM
>> From: peay@protonmail.com
>> To: Eugene Kirpichov <ki...@google.com>
>> user@beam.apache.org <us...@beam.apache.org>
>>
>> Ah, I didn't know about that. This is *really* great -- from a quick
>> look, the API looks both very natural and very powerful. Thanks a lot for
>> getting this into Beam!
>>
>> I see Flink support seems to have been merged already. Any idea on when
>> https://github.com/apache/beam/pull/1898 will get merged?
>>
>> I see updateWatermark in the API but not in the proposal's examples which
>> only uses resume/withFutureOutputWatermark.  Any reason why updateWatermark
>> is not called after each output in the examples from the proposal? I guess
>> that would be "too fined-grained" to update it for each individual record
>> of a mini-batch?
>>
>> In my case with existing hourly files, would `outputElement(01:00 file),
>> updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...`
>>  be the proper way to output per-hour elements while gradually moving the
>> watermark forward while going through an existing list? Or would you
>> instead suggest to still use resume (potentially with were small timeouts)?
>>
>> Thanks,
>>
>> -------- Original Message --------
>> Subject: Re: Using watermarks with bounded sources
>> Local Time: 22 April 2017 3:59 PM
>> UTC Time: 22 April 2017 19:59
>> From: kirpichov@google.com
>> To: peay <pe...@protonmail.com>, user@beam.apache.org <
>> user@beam.apache.org>
>>
>> Hi! This is an excellent question; don't have time to reply in much
>> detail right now, but please take a look at
>> http://s.apache.org/splittable-do-fn - it unifies the concepts of
>> bounded and unbounded sources, and the use case you mentioned is one of the
>> motivating examples.
>>
>> Also, see recent discussions on pipeline termination semantics:
>> technically nothing should prevent an unbounded source from saying it's
>> done "for real" (no new data will appear), just the current UnboundedSource
>> API does not expose such a method. (but Splittable DoFn does)
>>
>> On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:
>>
>>> Hello,
>>>
>>> A use case I find myself running into frequently is the following: I
>>> have daily or hourly files, and a Beam pipeline with a small to moderate
>>> size windows. (Actually, I've just seen that support for per-window files
>>> support in file based sinks was recently checked in, which is one way to
>>> get there).
>>>
>>> Now, Beam has no clue about the fact that each file corresponds to a
>>> given time interval. My understanding is that when running the pipeline in
>>> batch mode with a bounded source, there is no notion watermark and we have
>>> to load everything because we just don't know. This is pretty wasteful,
>>> especially as you have to keep a lot of data in memory, while you could in
>>> principle operate close to what you'd do in streaming mode: first read the
>>> oldest files, then newest files, moving the watermark forward as you go
>>> through the input list of files.
>>>
>>> I see one way around this. Let's say that I have hourly files and let's
>>> not assume anything about the order of records within the file to keep it
>>> simple: I don't want a very precise record-level watermark, but more a
>>> rough watermark at the granularity of hours. Say we can easily get the
>>> corresponding time interval from the filename. One can make an unbounded
>>> source that essentially acts as a "List of bounded file-based sources". If
>>> there are K splits, split k can read every file that has `index % K == k`
>>> in the time-ordered list of files. `advance` can advance the current file,
>>> and move on to the next one if no records were read.
>>>
>>> However, as far as I understand, this pipeline will never terminate
>>> since this is an unbounded source and having the `advance` method of our
>>> wrapping source return `false` won't make the pipeline terminate. Can
>>> someone confirm if this is correct? If yes, what would be ways to work
>>> around that? There's always the option to throw to make the pipeline fail,
>>> but this is far from ideal.
>>>
>>> Thanks,
>>>
>>
>>

Re: Using watermarks with bounded sources

Posted by Eugene Kirpichov <ki...@google.com>.
Hi,
The PR is ready and I'm just struggling with setup of tests - Dataflow
ValidatesRunner tests currently don't have a streaming execution.
I think +Kenn Knowles <kl...@google.com> was doing something about that, or I
might find a workaround.

But basically if you want to experiment - if you patch in the PR, you can
experiment with SDF in Dataflow in streaming mode. It passes tests against
the current production Dataflow Service.


On Thu, Jun 15, 2017 at 8:54 AM peay <pe...@protonmail.com> wrote:

> Eugene, would you have an ETA on when splittable DoFn would be available
> in Dataflow in batch/streaming mode? I see that
> https://github.com/apache/beam/pull/1898 is still active
>
> I've started to experiment with those using the DirectRunner and this is a
> great API.
>
> Thanks!
>
> -------- Original Message --------
> Subject: Re: Using watermarks with bounded sources
>
> Local Time: April 23, 2017 10:18 AM
> UTC Time: April 23, 2017 2:18 PM
> From: peay@protonmail.com
> To: Eugene Kirpichov <ki...@google.com>
> user@beam.apache.org <us...@beam.apache.org>
>
> Ah, I didn't know about that. This is *really* great -- from a quick look,
> the API looks both very natural and very powerful. Thanks a lot for getting
> this into Beam!
>
> I see Flink support seems to have been merged already. Any idea on when
> https://github.com/apache/beam/pull/1898 will get merged?
>
> I see updateWatermark in the API but not in the proposal's examples which
> only uses resume/withFutureOutputWatermark.  Any reason why updateWatermark
> is not called after each output in the examples from the proposal? I guess
> that would be "too fined-grained" to update it for each individual record
> of a mini-batch?
>
> In my case with existing hourly files, would `outputElement(01:00 file),
> updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...`
>  be the proper way to output per-hour elements while gradually moving the
> watermark forward while going through an existing list? Or would you
> instead suggest to still use resume (potentially with were small timeouts)?
>
> Thanks,
>
> -------- Original Message --------
> Subject: Re: Using watermarks with bounded sources
> Local Time: 22 April 2017 3:59 PM
> UTC Time: 22 April 2017 19:59
> From: kirpichov@google.com
> To: peay <pe...@protonmail.com>, user@beam.apache.org <user@beam.apache.org
> >
>
> Hi! This is an excellent question; don't have time to reply in much detail
> right now, but please take a look at http://s.apache.org/splittable-do-fn -
> it unifies the concepts of bounded and unbounded sources, and the use case
> you mentioned is one of the motivating examples.
>
> Also, see recent discussions on pipeline termination semantics:
> technically nothing should prevent an unbounded source from saying it's
> done "for real" (no new data will appear), just the current UnboundedSource
> API does not expose such a method. (but Splittable DoFn does)
>
> On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:
>
>> Hello,
>>
>> A use case I find myself running into frequently is the following: I have
>> daily or hourly files, and a Beam pipeline with a small to moderate size
>> windows. (Actually, I've just seen that support for per-window files
>> support in file based sinks was recently checked in, which is one way to
>> get there).
>>
>> Now, Beam has no clue about the fact that each file corresponds to a
>> given time interval. My understanding is that when running the pipeline in
>> batch mode with a bounded source, there is no notion watermark and we have
>> to load everything because we just don't know. This is pretty wasteful,
>> especially as you have to keep a lot of data in memory, while you could in
>> principle operate close to what you'd do in streaming mode: first read the
>> oldest files, then newest files, moving the watermark forward as you go
>> through the input list of files.
>>
>> I see one way around this. Let's say that I have hourly files and let's
>> not assume anything about the order of records within the file to keep it
>> simple: I don't want a very precise record-level watermark, but more a
>> rough watermark at the granularity of hours. Say we can easily get the
>> corresponding time interval from the filename. One can make an unbounded
>> source that essentially acts as a "List of bounded file-based sources". If
>> there are K splits, split k can read every file that has `index % K == k`
>> in the time-ordered list of files. `advance` can advance the current file,
>> and move on to the next one if no records were read.
>>
>> However, as far as I understand, this pipeline will never terminate since
>> this is an unbounded source and having the `advance` method of our wrapping
>> source return `false` won't make the pipeline terminate. Can someone
>> confirm if this is correct? If yes, what would be ways to work around that?
>> There's always the option to throw to make the pipeline fail, but this is
>> far from ideal.
>>
>> Thanks,
>>
>
>

Re: Using watermarks with bounded sources

Posted by peay <pe...@protonmail.com>.
Eugene, would you have an ETA on when splittable DoFn would be available in Dataflow in batch/streaming mode? I see that https://github.com/apache/beam/pull/1898 is still active

I've started to experiment with those using the DirectRunner and this is a great API.

Thanks!

-------- Original Message --------
Subject: Re: Using watermarks with bounded sources
Local Time: April 23, 2017 10:18 AM
UTC Time: April 23, 2017 2:18 PM
From: peay@protonmail.com
To: Eugene Kirpichov <ki...@google.com>
user@beam.apache.org <us...@beam.apache.org>

Ah, I didn't know about that. This is *really* great -- from a quick look, the API looks both very natural and very powerful. Thanks a lot for getting this into Beam!

I see Flink support seems to have been merged already. Any idea on when https://github.com/apache/beam/pull/1898 will get merged?

I see updateWatermark in the API but not in the proposal's examples which only uses resume/withFutureOutputWatermark. Any reason why updateWatermark is not called after each output in the examples from the proposal? I guess that would be "too fined-grained" to update it for each individual record of a mini-batch?

In my case with existing hourly files, would `outputElement(01:00 file), updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...` be the proper way to output per-hour elements while gradually moving the watermark forward while going through an existing list? Or would you instead suggest to still use resume (potentially with were small timeouts)?

Thanks,

-------- Original Message --------
Subject: Re: Using watermarks with bounded sources
Local Time: 22 April 2017 3:59 PM
UTC Time: 22 April 2017 19:59
From: kirpichov@google.com
To: peay <pe...@protonmail.com>, user@beam.apache.org <us...@beam.apache.org>

Hi! This is an excellent question; don't have time to reply in much detail right now, but please take a look at http://s.apache.org/splittable-do-fn - it unifies the concepts of bounded and unbounded sources, and the use case you mentioned is one of the motivating examples.

Also, see recent discussions on pipeline termination semantics: technically nothing should prevent an unbounded source from saying it's done "for real" (no new data will appear), just the current UnboundedSource API does not expose such a method. (but Splittable DoFn does)

On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:
Hello,

A use case I find myself running into frequently is the following: I have daily or hourly files, and a Beam pipeline with a small to moderate size windows. (Actually, I've just seen that support for per-window files support in file based sinks was recently checked in, which is one way to get there).

Now, Beam has no clue about the fact that each file corresponds to a given time interval. My understanding is that when running the pipeline in batch mode with a bounded source, there is no notion watermark and we have to load everything because we just don't know. This is pretty wasteful, especially as you have to keep a lot of data in memory, while you could in principle operate close to what you'd do in streaming mode: first read the oldest files, then newest files, moving the watermark forward as you go through the input list of files.

I see one way around this. Let's say that I have hourly files and let's not assume anything about the order of records within the file to keep it simple: I don't want a very precise record-level watermark, but more a rough watermark at the granularity of hours. Say we can easily get the corresponding time interval from the filename. One can make an unbounded source that essentially acts as a "List of bounded file-based sources". If there are K splits, split k can read every file that has `index % K == k` in the time-ordered list of files. `advance` can advance the current file, and move on to the next one if no records were read.

However, as far as I understand, this pipeline will never terminate since this is an unbounded source and having the `advance` method of our wrapping source return `false` won't make the pipeline terminate. Can someone confirm if this is correct? If yes, what would be ways to work around that? There's always the option to throw to make the pipeline fail, but this is far from ideal.

Thanks,

Re: Using watermarks with bounded sources

Posted by Lukasz Cwik <lc...@google.com>.
BoundedSource is able to report the timestamp[1] for records. It is just
that runners know that it is a fixed dataset so they have a trivial
optimization where the watermark goes from negative infinity to positive
infinity once all the data is read. For bounded splittable DoFns, its
likely that runners will perform the same optimization even if you are able
to report the watermark.

1:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L395

On Sun, Apr 23, 2017 at 7:18 AM, peay <pe...@protonmail.com> wrote:

> Ah, I didn't know about that. This is *really* great -- from a quick look,
> the API looks both very natural and very powerful. Thanks a lot for getting
> this into Beam!
>
> I see Flink support seems to have been merged already. Any idea on when
> https://github.com/apache/beam/pull/1898 will get merged?
>
> I see updateWatermark in the API but not in the proposal's examples which
> only uses resume/withFutureOutputWatermark.  Any reason
> why updateWatermark is not called after each output in the examples from
> the proposal? I guess that would be "too fined-grained" to update it for
> each individual record of a mini-batch?
>
> In my case with existing hourly files, would `outputElement(01:00 file),
> updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...`
>  be the proper way to output per-hour elements while gradually moving the
> watermark forward while going through an existing list? Or would you
> instead suggest to still use resume (potentially with were small timeouts)?
>
> Thanks,
>
> -------- Original Message --------
> Subject: Re: Using watermarks with bounded sources
> Local Time: 22 April 2017 3:59 PM
> UTC Time: 22 April 2017 19:59
> From: kirpichov@google.com
> To: peay <pe...@protonmail.com>, user@beam.apache.org <user@beam.apache.org
> >
>
> Hi! This is an excellent question; don't have time to reply in much detail
> right now, but please take a look at http://s.apache.org/splittable-do-fn -
> it unifies the concepts of bounded and unbounded sources, and the use case
> you mentioned is one of the motivating examples.
>
> Also, see recent discussions on pipeline termination semantics:
> technically nothing should prevent an unbounded source from saying it's
> done "for real" (no new data will appear), just the current UnboundedSource
> API does not expose such a method. (but Splittable DoFn does)
>
> On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:
>
>> Hello,
>>
>> A use case I find myself running into frequently is the following: I have
>> daily or hourly files, and a Beam pipeline with a small to moderate size
>> windows. (Actually, I've just seen that support for per-window files
>> support in file based sinks was recently checked in, which is one way to
>> get there).
>>
>> Now, Beam has no clue about the fact that each file corresponds to a
>> given time interval. My understanding is that when running the pipeline in
>> batch mode with a bounded source, there is no notion watermark and we have
>> to load everything because we just don't know. This is pretty wasteful,
>> especially as you have to keep a lot of data in memory, while you could in
>> principle operate close to what you'd do in streaming mode: first read the
>> oldest files, then newest files, moving the watermark forward as you go
>> through the input list of files.
>>
>> I see one way around this. Let's say that I have hourly files and let's
>> not assume anything about the order of records within the file to keep it
>> simple: I don't want a very precise record-level watermark, but more a
>> rough watermark at the granularity of hours. Say we can easily get the
>> corresponding time interval from the filename. One can make an unbounded
>> source that essentially acts as a "List of bounded file-based sources". If
>> there are K splits, split k can read every file that has `index % K == k`
>> in the time-ordered list of files. `advance` can advance the current file,
>> and move on to the next one if no records were read.
>>
>> However, as far as I understand, this pipeline will never terminate since
>> this is an unbounded source and having the `advance` method of our wrapping
>> source return `false` won't make the pipeline terminate. Can someone
>> confirm if this is correct? If yes, what would be ways to work around that?
>> There's always the option to throw to make the pipeline fail, but this is
>> far from ideal.
>>
>> Thanks,
>>
>
>

Re: Using watermarks with bounded sources

Posted by peay <pe...@protonmail.com>.
Ah, I didn't know about that. This is *really* great -- from a quick look, the API looks both very natural and very powerful. Thanks a lot for getting this into Beam!

I see Flink support seems to have been merged already. Any idea on when https://github.com/apache/beam/pull/1898 will get merged?

I see updateWatermark in the API but not in the proposal's examples which only uses resume/withFutureOutputWatermark. Any reason why updateWatermark is not called after each output in the examples from the proposal? I guess that would be "too fined-grained" to update it for each individual record of a mini-batch?

In my case with existing hourly files, would `outputElement(01:00 file), updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...` be the proper way to output per-hour elements while gradually moving the watermark forward while going through an existing list? Or would you instead suggest to still use resume (potentially with were small timeouts)?

Thanks,

-------- Original Message --------
Subject: Re: Using watermarks with bounded sources
Local Time: 22 April 2017 3:59 PM
UTC Time: 22 April 2017 19:59
From: kirpichov@google.com
To: peay <pe...@protonmail.com>, user@beam.apache.org <us...@beam.apache.org>

Hi! This is an excellent question; don't have time to reply in much detail right now, but please take a look at http://s.apache.org/splittable-do-fn - it unifies the concepts of bounded and unbounded sources, and the use case you mentioned is one of the motivating examples.

Also, see recent discussions on pipeline termination semantics: technically nothing should prevent an unbounded source from saying it's done "for real" (no new data will appear), just the current UnboundedSource API does not expose such a method. (but Splittable DoFn does)

On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:
Hello,

A use case I find myself running into frequently is the following: I have daily or hourly files, and a Beam pipeline with a small to moderate size windows. (Actually, I've just seen that support for per-window files support in file based sinks was recently checked in, which is one way to get there).

Now, Beam has no clue about the fact that each file corresponds to a given time interval. My understanding is that when running the pipeline in batch mode with a bounded source, there is no notion watermark and we have to load everything because we just don't know. This is pretty wasteful, especially as you have to keep a lot of data in memory, while you could in principle operate close to what you'd do in streaming mode: first read the oldest files, then newest files, moving the watermark forward as you go through the input list of files.

I see one way around this. Let's say that I have hourly files and let's not assume anything about the order of records within the file to keep it simple: I don't want a very precise record-level watermark, but more a rough watermark at the granularity of hours. Say we can easily get the corresponding time interval from the filename. One can make an unbounded source that essentially acts as a "List of bounded file-based sources". If there are K splits, split k can read every file that has `index % K == k` in the time-ordered list of files. `advance` can advance the current file, and move on to the next one if no records were read.

However, as far as I understand, this pipeline will never terminate since this is an unbounded source and having the `advance` method of our wrapping source return `false` won't make the pipeline terminate. Can someone confirm if this is correct? If yes, what would be ways to work around that? There's always the option to throw to make the pipeline fail, but this is far from ideal.

Thanks,

Re: Using watermarks with bounded sources

Posted by Eugene Kirpichov <ki...@google.com>.
Hi! This is an excellent question; don't have time to reply in much detail
right now, but please take a look at http://s.apache.org/splittable-do-fn -
it unifies the concepts of bounded and unbounded sources, and the use case
you mentioned is one of the motivating examples.

Also, see recent discussions on pipeline termination semantics: technically
nothing should prevent an unbounded source from saying it's done "for real"
(no new data will appear), just the current UnboundedSource API does not
expose such a method. (but Splittable DoFn does)

On Sat, Apr 22, 2017 at 11:15 AM peay <pe...@protonmail.com> wrote:

> Hello,
>
> A use case I find myself running into frequently is the following: I have
> daily or hourly files, and a Beam pipeline with a small to moderate size
> windows. (Actually, I've just seen that support for per-window files
> support in file based sinks was recently checked in, which is one way to
> get there).
>
> Now, Beam has no clue about the fact that each file corresponds to a given
> time interval. My understanding is that when running the pipeline in batch
> mode with a bounded source, there is no notion watermark and we have to
> load everything because we just don't know. This is pretty wasteful,
> especially as you have to keep a lot of data in memory, while you could in
> principle operate close to what you'd do in streaming mode: first read the
> oldest files, then newest files, moving the watermark forward as you go
> through the input list of files.
>
> I see one way around this. Let's say that I have hourly files and let's
> not assume anything about the order of records within the file to keep it
> simple: I don't want a very precise record-level watermark, but more a
> rough watermark at the granularity of hours. Say we can easily get the
> corresponding time interval from the filename. One can make an unbounded
> source that essentially acts as a "List of bounded file-based sources". If
> there are K splits, split k can read every file that has `index % K == k`
> in the time-ordered list of files. `advance` can advance the current file,
> and move on to the next one if no records were read.
>
> However, as far as I understand, this pipeline will never terminate since
> this is an unbounded source and having the `advance` method of our wrapping
> source return `false` won't make the pipeline terminate. Can someone
> confirm if this is correct? If yes, what would be ways to work around that?
> There's always the option to throw to make the pipeline fail, but this is
> far from ideal.
>
> Thanks,
>