You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Shen Li <cs...@gmail.com> on 2017/04/20 20:42:57 UTC

Can application specify how watermarks should be generated?

Hi,

Can application developers provide classes/methods to specify how to
generate watermarks from sources, and how to aggregate watermarks from
multiple input PCollections? Say, emit at most 1 watermark per second, or
create watermarks that are 5 seconds older than the latest tuple's
timestamp?

Thanks,

Shen

Re: Can application specify how watermarks should be generated?

Posted by Shen Li <cs...@gmail.com>.
Thank you, Thomas. That clears my confusions.  :)

Shen

On Tue, Apr 25, 2017 at 7:30 PM, Thomas Groh <tg...@google.com.invalid>
wrote:

> getCurrentTimestamp returns the timestamp of the current element. Both
> Bounded and Unbounded Readers have this method.
>
> For a bounded source, this is safe - the source watermark can be held to
> negative infinity while elements remain in the source and advance to
> infinity after all elements are read, and elements can be arbitrarily
> shifted forwards in time later in the pipeline (for example, via a
> "WithTimestamps" transform or a DoFn that uses "outputWithTimestamp"). It's
> not safe to output elements at negative infinity when there is a watermark
> that may drop elements, as is the case for unbounded sources.
>
> On Fri, Apr 21, 2017 at 8:44 AM, Shen Li <cs...@gmail.com> wrote:
>
> > Hi,
> >
> > A follow-up question. I found that the getWatermark() API is only
> available
> > for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API
> > with comments "By default, returns the minimum possible timestamp", which
> > sounds like a watermark. Any reason for the difference in method names?
> >
> > Shen
> >
> > On Thu, Apr 20, 2017 at 11:46 PM, Shen Li <cs...@gmail.com> wrote:
> >
> > > Thanks!
> > >
> > > Shen
> > >
> > >
> > > On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mi...@gmail.com>
> wrote:
> > >
> > >> In KafkaIO, it's possible to provide customized watermark function, to
> > >> control how to advance current watermark. I'm not familiar with other
> > >> unbounded IOs, assume they should support it as getWatermark() is
> > defined
> > >> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.
> > >>
> > >> A quick example to hold watermark 10 seconds earlier than processing
> > time,
> > >> you can have more complex logic based on KafkaRecord content.
> > >> KafkaIO.<>read()
> > >>   .withWatermarkFn2(new SerializableFunction<
> > KafkaRecord<String,String>,
> > >> Instant>() {
> > >>     @Override
> > >>     public Instant apply(KafkaRecord<String, String> input) {
> > >>       return new Instant().minus(Duration.standardSeconds(10));
> > >>     }
> > >>   }
> > >>
> > >>
> > >> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles
> <klk@google.com.invalid
> > >
> > >> wrote:
> > >>
> > >> > You want to use an existing source but just change the watermark
> > >> tracking?
> > >> > You can't do this in your pipeline right now, but you could probably
> > >> easily
> > >> > wrap a source and proxy every method except getWatermark, though I
> > have
> > >> > never tried.
> > >> >
> > >> > The general feature that might address this is discussed a little on
> > >> > https://issues.apache.org/jira/browse/BEAM-644
> > >> >
> > >> > There are also related ideas in the discussions about Splittable
> DoFn.
> > >> >
> > >> > Kenn
> > >> >
> > >> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs...@gmail.com>
> wrote:
> > >> >
> > >> > > Hi,
> > >> > >
> > >> > > Can application developers provide classes/methods to specify how
> to
> > >> > > generate watermarks from sources, and how to aggregate watermarks
> > from
> > >> > > multiple input PCollections? Say, emit at most 1 watermark per
> > >> second, or
> > >> > > create watermarks that are 5 seconds older than the latest tuple's
> > >> > > timestamp?
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Shen
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> ----
> > >> Mingmin
> > >>
> > >
> > >
> >
>

Re: Can application specify how watermarks should be generated?

Posted by Thomas Groh <tg...@google.com.INVALID>.
getCurrentTimestamp returns the timestamp of the current element. Both
Bounded and Unbounded Readers have this method.

For a bounded source, this is safe - the source watermark can be held to
negative infinity while elements remain in the source and advance to
infinity after all elements are read, and elements can be arbitrarily
shifted forwards in time later in the pipeline (for example, via a
"WithTimestamps" transform or a DoFn that uses "outputWithTimestamp"). It's
not safe to output elements at negative infinity when there is a watermark
that may drop elements, as is the case for unbounded sources.

On Fri, Apr 21, 2017 at 8:44 AM, Shen Li <cs...@gmail.com> wrote:

> Hi,
>
> A follow-up question. I found that the getWatermark() API is only available
> for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API
> with comments "By default, returns the minimum possible timestamp", which
> sounds like a watermark. Any reason for the difference in method names?
>
> Shen
>
> On Thu, Apr 20, 2017 at 11:46 PM, Shen Li <cs...@gmail.com> wrote:
>
> > Thanks!
> >
> > Shen
> >
> >
> > On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mi...@gmail.com> wrote:
> >
> >> In KafkaIO, it's possible to provide customized watermark function, to
> >> control how to advance current watermark. I'm not familiar with other
> >> unbounded IOs, assume they should support it as getWatermark() is
> defined
> >> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.
> >>
> >> A quick example to hold watermark 10 seconds earlier than processing
> time,
> >> you can have more complex logic based on KafkaRecord content.
> >> KafkaIO.<>read()
> >>   .withWatermarkFn2(new SerializableFunction<
> KafkaRecord<String,String>,
> >> Instant>() {
> >>     @Override
> >>     public Instant apply(KafkaRecord<String, String> input) {
> >>       return new Instant().minus(Duration.standardSeconds(10));
> >>     }
> >>   }
> >>
> >>
> >> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <klk@google.com.invalid
> >
> >> wrote:
> >>
> >> > You want to use an existing source but just change the watermark
> >> tracking?
> >> > You can't do this in your pipeline right now, but you could probably
> >> easily
> >> > wrap a source and proxy every method except getWatermark, though I
> have
> >> > never tried.
> >> >
> >> > The general feature that might address this is discussed a little on
> >> > https://issues.apache.org/jira/browse/BEAM-644
> >> >
> >> > There are also related ideas in the discussions about Splittable DoFn.
> >> >
> >> > Kenn
> >> >
> >> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs...@gmail.com> wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > Can application developers provide classes/methods to specify how to
> >> > > generate watermarks from sources, and how to aggregate watermarks
> from
> >> > > multiple input PCollections? Say, emit at most 1 watermark per
> >> second, or
> >> > > create watermarks that are 5 seconds older than the latest tuple's
> >> > > timestamp?
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Shen
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> ----
> >> Mingmin
> >>
> >
> >
>

Re: Can application specify how watermarks should be generated?

Posted by Shen Li <cs...@gmail.com>.
Hi,

A follow-up question. I found that the getWatermark() API is only available
for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API
with comments "By default, returns the minimum possible timestamp", which
sounds like a watermark. Any reason for the difference in method names?

Shen

On Thu, Apr 20, 2017 at 11:46 PM, Shen Li <cs...@gmail.com> wrote:

> Thanks!
>
> Shen
>
>
> On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mi...@gmail.com> wrote:
>
>> In KafkaIO, it's possible to provide customized watermark function, to
>> control how to advance current watermark. I'm not familiar with other
>> unbounded IOs, assume they should support it as getWatermark() is defined
>> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.
>>
>> A quick example to hold watermark 10 seconds earlier than processing time,
>> you can have more complex logic based on KafkaRecord content.
>> KafkaIO.<>read()
>>   .withWatermarkFn2(new SerializableFunction<KafkaRecord<String,String>,
>> Instant>() {
>>     @Override
>>     public Instant apply(KafkaRecord<String, String> input) {
>>       return new Instant().minus(Duration.standardSeconds(10));
>>     }
>>   }
>>
>>
>> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <kl...@google.com.invalid>
>> wrote:
>>
>> > You want to use an existing source but just change the watermark
>> tracking?
>> > You can't do this in your pipeline right now, but you could probably
>> easily
>> > wrap a source and proxy every method except getWatermark, though I have
>> > never tried.
>> >
>> > The general feature that might address this is discussed a little on
>> > https://issues.apache.org/jira/browse/BEAM-644
>> >
>> > There are also related ideas in the discussions about Splittable DoFn.
>> >
>> > Kenn
>> >
>> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs...@gmail.com> wrote:
>> >
>> > > Hi,
>> > >
>> > > Can application developers provide classes/methods to specify how to
>> > > generate watermarks from sources, and how to aggregate watermarks from
>> > > multiple input PCollections? Say, emit at most 1 watermark per
>> second, or
>> > > create watermarks that are 5 seconds older than the latest tuple's
>> > > timestamp?
>> > >
>> > > Thanks,
>> > >
>> > > Shen
>> > >
>> >
>>
>>
>>
>> --
>> ----
>> Mingmin
>>
>
>

Re: Can application specify how watermarks should be generated?

Posted by Shen Li <cs...@gmail.com>.
Thanks!

Shen

On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mi...@gmail.com> wrote:

> In KafkaIO, it's possible to provide customized watermark function, to
> control how to advance current watermark. I'm not familiar with other
> unbounded IOs, assume they should support it as getWatermark() is defined
> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.
>
> A quick example to hold watermark 10 seconds earlier than processing time,
> you can have more complex logic based on KafkaRecord content.
> KafkaIO.<>read()
>   .withWatermarkFn2(new SerializableFunction<KafkaRecord<String,String>,
> Instant>() {
>     @Override
>     public Instant apply(KafkaRecord<String, String> input) {
>       return new Instant().minus(Duration.standardSeconds(10));
>     }
>   }
>
>
> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <kl...@google.com.invalid>
> wrote:
>
> > You want to use an existing source but just change the watermark
> tracking?
> > You can't do this in your pipeline right now, but you could probably
> easily
> > wrap a source and proxy every method except getWatermark, though I have
> > never tried.
> >
> > The general feature that might address this is discussed a little on
> > https://issues.apache.org/jira/browse/BEAM-644
> >
> > There are also related ideas in the discussions about Splittable DoFn.
> >
> > Kenn
> >
> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > Can application developers provide classes/methods to specify how to
> > > generate watermarks from sources, and how to aggregate watermarks from
> > > multiple input PCollections? Say, emit at most 1 watermark per second,
> or
> > > create watermarks that are 5 seconds older than the latest tuple's
> > > timestamp?
> > >
> > > Thanks,
> > >
> > > Shen
> > >
> >
>
>
>
> --
> ----
> Mingmin
>

Re: Can application specify how watermarks should be generated?

Posted by Mingmin Xu <mi...@gmail.com>.
In KafkaIO, it's possible to provide customized watermark function, to
control how to advance current watermark. I'm not familiar with other
unbounded IOs, assume they should support it as getWatermark() is defined
in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.

A quick example to hold watermark 10 seconds earlier than processing time,
you can have more complex logic based on KafkaRecord content.
KafkaIO.<>read()
  .withWatermarkFn2(new SerializableFunction<KafkaRecord<String,String>,
Instant>() {
    @Override
    public Instant apply(KafkaRecord<String, String> input) {
      return new Instant().minus(Duration.standardSeconds(10));
    }
  }


On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <kl...@google.com.invalid>
wrote:

> You want to use an existing source but just change the watermark tracking?
> You can't do this in your pipeline right now, but you could probably easily
> wrap a source and proxy every method except getWatermark, though I have
> never tried.
>
> The general feature that might address this is discussed a little on
> https://issues.apache.org/jira/browse/BEAM-644
>
> There are also related ideas in the discussions about Splittable DoFn.
>
> Kenn
>
> On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs...@gmail.com> wrote:
>
> > Hi,
> >
> > Can application developers provide classes/methods to specify how to
> > generate watermarks from sources, and how to aggregate watermarks from
> > multiple input PCollections? Say, emit at most 1 watermark per second, or
> > create watermarks that are 5 seconds older than the latest tuple's
> > timestamp?
> >
> > Thanks,
> >
> > Shen
> >
>



-- 
----
Mingmin

Re: Can application specify how watermarks should be generated?

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
You want to use an existing source but just change the watermark tracking?
You can't do this in your pipeline right now, but you could probably easily
wrap a source and proxy every method except getWatermark, though I have
never tried.

The general feature that might address this is discussed a little on
https://issues.apache.org/jira/browse/BEAM-644

There are also related ideas in the discussions about Splittable DoFn.

Kenn

On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs...@gmail.com> wrote:

> Hi,
>
> Can application developers provide classes/methods to specify how to
> generate watermarks from sources, and how to aggregate watermarks from
> multiple input PCollections? Say, emit at most 1 watermark per second, or
> create watermarks that are 5 seconds older than the latest tuple's
> timestamp?
>
> Thanks,
>
> Shen
>