You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Bhupesh Chawda <bh...@apache.org> on 2017/03/08 06:46:20 UTC

Watermark tuples in Apex

Hi All,

Watermark tuples in Apex are very tightly coupled to event time processing.
For this reason, usually they are modeled as having a timestamp.

public interface WatermarkTuple
{
  long getTimestamp();
}

Even though, watermarks are meant for such time related processing, I think
we should expand the concept of watermarks for the following types:

1. Labelled watermarks
This could be useful in scenarios where instead of a timestamp (which is an
ordered field), we have categorical values. For example, consider tuples
which are labeled by city names. For each city, we want to have separate
windows and isolate the processing. If the watermark returns a different
city name, we end the previous window and start a new one. Or, in this case
we could make use of both high and low watermarks indicating the start and
end of a city's data. This could mean having multiple windows' data
incoming at the same time.

2. Ordered watermarks
Instead of having the ordered field as time, why not consider something
like an Ordered Watermark. TimeBased Watermarks could extend from that.
An ordered watermark could be used in case we have a sequence of data
tuples and we need to demarcate every n tuples. Even though we can say that
every n tuples the window is definitely closed, but the decision is made
only when the upstream sends the watermark tuple. The windowed operator
does not have any clue about it. It blindly opens and closes windows based
on watermarks received from upstream. This could mean different windows may
have different values of n.

Please let me know your thoughts on this.

~ Bhupesh

Re: Watermark tuples in Apex

Posted by Pramod Immaneni <pr...@datatorrent.com>.
+1 for labelled watermarks.

On Tue, Mar 7, 2017 at 10:46 PM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi All,
>
> Watermark tuples in Apex are very tightly coupled to event time processing.
> For this reason, usually they are modeled as having a timestamp.
>
> public interface WatermarkTuple
> {
>   long getTimestamp();
> }
>
> Even though, watermarks are meant for such time related processing, I think
> we should expand the concept of watermarks for the following types:
>
> 1. Labelled watermarks
> This could be useful in scenarios where instead of a timestamp (which is an
> ordered field), we have categorical values. For example, consider tuples
> which are labeled by city names. For each city, we want to have separate
> windows and isolate the processing. If the watermark returns a different
> city name, we end the previous window and start a new one. Or, in this case
> we could make use of both high and low watermarks indicating the start and
> end of a city's data. This could mean having multiple windows' data
> incoming at the same time.
>
> 2. Ordered watermarks
> Instead of having the ordered field as time, why not consider something
> like an Ordered Watermark. TimeBased Watermarks could extend from that.
> An ordered watermark could be used in case we have a sequence of data
> tuples and we need to demarcate every n tuples. Even though we can say that
> every n tuples the window is definitely closed, but the decision is made
> only when the upstream sends the watermark tuple. The windowed operator
> does not have any clue about it. It blindly opens and closes windows based
> on watermarks received from upstream. This could mean different windows may
> have different values of n.
>
> Please let me know your thoughts on this.
>
> ~ Bhupesh
>

Re: Watermark tuples in Apex

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
Hi Ajay,

In the proposed scenario, the requirement is for the source to be the
generator of more types of watermarks as well as the windowed operator to
be cognizant of these. Such watermarks would also supply enough information
depending on the watermark viz. labels, sequence numbers etc. Thus the
windowed operator could work with watermarks which may be based on
something other than time as well.

Implicit watermarks are needed in case we do not have the capability to
generate the watermarks at the source. In such a case, the windowed
operator could itself "assume" the watermarks as per pre-defined logic and
act as if it was received from upstream. The proposed watermarks could be
used as implicit variants as well since "implicit watermark" is an
orthogonal case.

~ Bhupesh




_______________________________________________________

Bhupesh Chawda

E: bhupesh@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Fri, Mar 10, 2017 at 5:51 PM, AJAY GUPTA <aj...@gmail.com> wrote:

> Hi Bhupesh,
>
> For point 1, cant we make use of implicitWatermarkGenerator?
>
>
> Ajay
>
> On Wed, Mar 8, 2017 at 12:16 PM, Bhupesh Chawda <bh...@apache.org>
> wrote:
>
> > Hi All,
> >
> > Watermark tuples in Apex are very tightly coupled to event time
> processing.
> > For this reason, usually they are modeled as having a timestamp.
> >
> > public interface WatermarkTuple
> > {
> >   long getTimestamp();
> > }
> >
> > Even though, watermarks are meant for such time related processing, I
> think
> > we should expand the concept of watermarks for the following types:
> >
> > 1. Labelled watermarks
> > This could be useful in scenarios where instead of a timestamp (which is
> an
> > ordered field), we have categorical values. For example, consider tuples
> > which are labeled by city names. For each city, we want to have separate
> > windows and isolate the processing. If the watermark returns a different
> > city name, we end the previous window and start a new one. Or, in this
> case
> > we could make use of both high and low watermarks indicating the start
> and
> > end of a city's data. This could mean having multiple windows' data
> > incoming at the same time.
> >
> > 2. Ordered watermarks
> > Instead of having the ordered field as time, why not consider something
> > like an Ordered Watermark. TimeBased Watermarks could extend from that.
> > An ordered watermark could be used in case we have a sequence of data
> > tuples and we need to demarcate every n tuples. Even though we can say
> that
> > every n tuples the window is definitely closed, but the decision is made
> > only when the upstream sends the watermark tuple. The windowed operator
> > does not have any clue about it. It blindly opens and closes windows
> based
> > on watermarks received from upstream. This could mean different windows
> may
> > have different values of n.
> >
> > Please let me know your thoughts on this.
> >
> > ~ Bhupesh
> >
>

Re: Watermark tuples in Apex

Posted by AJAY GUPTA <aj...@gmail.com>.
Hi Bhupesh,

For point 1, cant we make use of implicitWatermarkGenerator?


Ajay

On Wed, Mar 8, 2017 at 12:16 PM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi All,
>
> Watermark tuples in Apex are very tightly coupled to event time processing.
> For this reason, usually they are modeled as having a timestamp.
>
> public interface WatermarkTuple
> {
>   long getTimestamp();
> }
>
> Even though, watermarks are meant for such time related processing, I think
> we should expand the concept of watermarks for the following types:
>
> 1. Labelled watermarks
> This could be useful in scenarios where instead of a timestamp (which is an
> ordered field), we have categorical values. For example, consider tuples
> which are labeled by city names. For each city, we want to have separate
> windows and isolate the processing. If the watermark returns a different
> city name, we end the previous window and start a new one. Or, in this case
> we could make use of both high and low watermarks indicating the start and
> end of a city's data. This could mean having multiple windows' data
> incoming at the same time.
>
> 2. Ordered watermarks
> Instead of having the ordered field as time, why not consider something
> like an Ordered Watermark. TimeBased Watermarks could extend from that.
> An ordered watermark could be used in case we have a sequence of data
> tuples and we need to demarcate every n tuples. Even though we can say that
> every n tuples the window is definitely closed, but the decision is made
> only when the upstream sends the watermark tuple. The windowed operator
> does not have any clue about it. It blindly opens and closes windows based
> on watermarks received from upstream. This could mean different windows may
> have different values of n.
>
> Please let me know your thoughts on this.
>
> ~ Bhupesh
>

Re: Watermark tuples in Apex

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
-->


On Fri, Mar 10, 2017 at 8:56 PM, Thomas Weise <th...@apache.org> wrote:

> -->
>
> On Tue, Mar 7, 2017 at 10:46 PM, Bhupesh Chawda <bh...@apache.org>
> wrote:
>
> > Hi All,
> >
> > Watermark tuples in Apex are very tightly coupled to event time
> processing.
> > For this reason, usually they are modeled as having a timestamp.
> >
> > public interface WatermarkTuple
> > {
> >   long getTimestamp();
> > }
> >
> > Even though, watermarks are meant for such time related processing, I
> think
> > we should expand the concept of watermarks for the following types:
> >
> > 1. Labelled watermarks
> > This could be useful in scenarios where instead of a timestamp (which is
> an
> > ordered field), we have categorical values. For example, consider tuples
> > which are labeled by city names. For each city, we want to have separate
> > windows and isolate the processing. If the watermark returns a different
> > city name, we end the previous window and start a new one. Or, in this
> case
> > we could make use of both high and low watermarks indicating the start
> and
> > end of a city's data. This could mean having multiple windows' data
> > incoming at the same time.
> >
> >
> To me city looks like a key and you are trying to make the case that each
> key should have a separate watermark. That is the case discussed on the
> Flink/Beam list that David referred to. I think we should not mix the
> concepts of watermark and key.
>
> ​Yes, city is a key here. This is similar to the discussion on per key
event time progress on Flink dev. But the ask here is to have a non-time
watermark which indicates when a particular key is finalized. In other
words, if a source does not have timestamps, when should the finalization
be done for that data? What would a watermark tuple look like in that case?
To answer this, we may have to think about how is the data "windowed"
downstream. In this case, there could be a window per key, rather than a
time window.
If you look at the file batch changes that we discussed on other thread, it
is doing the same thing where we have the key = window = filename. Why
can't we make it simpler by saying that the watermark would be per key,
instead of modelling the windows as file ids and setting the option as a
time option on the windowed operator.

>
> > 2. Ordered watermarks
> > Instead of having the ordered field as time, why not consider something
> > like an Ordered Watermark. TimeBased Watermarks could extend from that.
> > An ordered watermark could be used in case we have a sequence of data
> > tuples and we need to demarcate every n tuples. Even though we can say
> that
> > every n tuples the window is definitely closed, but the decision is made
> > only when the upstream sends the watermark tuple. The windowed operator
> > does not have any clue about it. It blindly opens and closes windows
> based
> > on watermarks received from upstream. This could mean different windows
> may
> > have different values of n.
> >
> > Please let me know your thoughts on this.
> >
> >
> Watermarks are already ordered and the state management is built based on
> that.
> ​
>


> Is your concern just the naming?
>
> ​Yes. Actually this is to avoid confusion for users since we are using a
time window to create a window based on some other ordered field.
​


>
> Thanks,
> Thomas
>

Re: Watermark tuples in Apex

Posted by Thomas Weise <th...@apache.org>.
-->

On Tue, Mar 7, 2017 at 10:46 PM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi All,
>
> Watermark tuples in Apex are very tightly coupled to event time processing.
> For this reason, usually they are modeled as having a timestamp.
>
> public interface WatermarkTuple
> {
>   long getTimestamp();
> }
>
> Even though, watermarks are meant for such time related processing, I think
> we should expand the concept of watermarks for the following types:
>
> 1. Labelled watermarks
> This could be useful in scenarios where instead of a timestamp (which is an
> ordered field), we have categorical values. For example, consider tuples
> which are labeled by city names. For each city, we want to have separate
> windows and isolate the processing. If the watermark returns a different
> city name, we end the previous window and start a new one. Or, in this case
> we could make use of both high and low watermarks indicating the start and
> end of a city's data. This could mean having multiple windows' data
> incoming at the same time.
>
>
To me city looks like a key and you are trying to make the case that each
key should have a separate watermark. That is the case discussed on the
Flink/Beam list that David referred to. I think we should not mix the
concepts of watermark and key.



> 2. Ordered watermarks
> Instead of having the ordered field as time, why not consider something
> like an Ordered Watermark. TimeBased Watermarks could extend from that.
> An ordered watermark could be used in case we have a sequence of data
> tuples and we need to demarcate every n tuples. Even though we can say that
> every n tuples the window is definitely closed, but the decision is made
> only when the upstream sends the watermark tuple. The windowed operator
> does not have any clue about it. It blindly opens and closes windows based
> on watermarks received from upstream. This could mean different windows may
> have different values of n.
>
> Please let me know your thoughts on this.
>
>
Watermarks are already ordered and the state management is built based on
that.

Is your concern just the naming?


Thanks,
Thomas