You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by amit kumar <ak...@gmail.com> on 2020/03/31 08:22:05 UTC

Default WindowFn for Unbounded source

Hi All,

Is there a default WindowFn that gets applied to elements of an unbounded
source.

For example, if I have a Kinesis input source ,for which all elements are
timestamped with ArrivalTime, what will be the default windowing applied to
the output of read transform ?

Is this runner dependent ?

Regards,
Amit

Re: Default WindowFn for Unbounded source

Posted by amit kumar <ak...@gmail.com>.
Thanks Ankur for your reply.

By default the allowed lateness for a global window is zero but we can also
set  it to be non-zero which will be used in the downstream transforms
where group by or window into with trigger is happening ?
 (using allowedTimeStampSkew for unbounded sources/ sources which have
timestamped elements).

In both scenarios which I described earlier for *source transforms* is it
possible that the pipeline will drop data if I do not
specify allowedTimeStampSkew/ allowedLateness at the source
transforms(given I have late arriving data)? Can I just set allowed
lateness in the transform where I do groupBy or windowInto rather than
source.

In case of TextIO.read which reads from a bounded source and I assign
Timestamps to all elements in the second transform, will it be useful in
this case as well to set allowedTimeStampSkew at source transform? I am
trying to understand how the elements will be available after assigning
timestamps (Given all files are present on file system), will they be
ordered by timestamp, can some elements be read after watermark has
progressed above an element's event time  ?


TextIO.Read.
     |. Bounded source
     |. Global Window
     |.  -infinity watermark
apply
WithTimeStamps (Based on a timestamp attribute in file)
   |.   timestamped elements (watermark starts from -infinity and follows
the timestamp from timestamp attribute)
   |.   Global Window


Regards,
Amit



 have an unbounded source and which by default will have global windows and

n the scenario I provided if I have downstream transforms to do group by
or window into with triggers will allowed lateness be useful in that
scenario at the source transforms?  If allowedLateness only pushes back the
timestamp of the element then it seems it will be useful.

In case of TextIO.Read

TextIO.Read.
     |. Bounded source
     |. Global Window
     |.  -infinity watermark
apply
WithTimeStamps (Based on a timestamp attribute in file)
   |.   timestamped elements (watermark starts from -infinity and follows
the timestamp from timestamp attribute)
   |.   Global Window
   |. (Will I never need to do allowedLateness in this case with default
trigger? Will there be any benefit since the window is global and watermark
will pass the end of window when everything is processed ?  )






On Tue, Mar 31, 2020 at 11:20 AM amit kumar <ak...@gmail.com> wrote:

> Thanks Jan!
> I have a question based on this on Global Window and allowed lateness,
> with default trigger for the following
>  scenarios:
>
> Case 1-
> TextIO.Read.
>      |. Bounded source
>      |. Global Window
>      |.  -infinity watermark
> apply
> WithTimeStamps (Based on a timestamp attribute in file)
>    |.   timestamped elements (watermark starts from -infinity and follows
> the timestamp from timestamp attribute)
>    |.   Global Window
>    |. (Will I never need to do allowedLateness in this case with default
> trigger? Will there be any benefit since the window is global and watermark
> will pass the end of window when everything is processed ?  )
>
>
> Case 2 -
> KinesisIO.read
>     | .Unbounded Source
>     |. Default Global Window
>     |. watermark based on arrival time
>  apply
> WithTimeStamps (Based on a timestamp attribute from the stream)
>    |.   timestamped elements  ( watermark follows the timestamp from
> timestamp attribute)
>    |.   Global Window
>    |. Watermark based on event timestamp.
>    | Same question here will there be any benefit of using
> allowedLateness since window is global ?
>
> In the code example below allowedLateness is used for global window ?
>
> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>
> Regards,
> Amit
>
> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Amit,
>>
>> the window function applied by default is
>> WindowingStrategy.globalDefault(), [1] - global window with zero allowed
>> lateness.
>>
>> Cheers,
>>
>>   Jan
>>
>> [1]
>>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>>
>> On 3/31/20 10:22 AM, amit kumar wrote:
>> > Hi All,
>> >
>> > Is there a default WindowFn that gets applied to elements of an
>> > unbounded source.
>> >
>> > For example, if I have a Kinesis input source ,for which all elements
>> > are timestamped with ArrivalTime, what will be the default windowing
>> > applied to the output of read transform ?
>> >
>> > Is this runner dependent ?
>> >
>> > Regards,
>> > Amit
>>
>

Re: Default WindowFn for Unbounded source

Posted by amit kumar <ak...@gmail.com>.
Thank you all!
your responses are very helpful.

On Wed, Apr 1, 2020 at 11:37 AM Robert Bradshaw <ro...@google.com> wrote:

>
>
> On Wed, Apr 1, 2020 at 12:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Amit,
>>
>> answers inline.
>> On 4/1/20 12:23 AM, amit kumar wrote:
>>
>> Thanks Ankur for your reply.
>>
>> By default the allowed lateness for a global window is zero but we can
>> also set  it to be non-zero which will be used in the downstream transforms
>> where group by or window into with trigger is happening ?
>>  (using allowedTimeStampSkew for unbounded sources/ sources which have
>> timestamped elements).
>>
>> Setting allowedLateness for global window has no semantic meaning,
>> because global window will be triggered (using default trigger) only at the
>> end of input. Allowed lateness plays no role in that for global window.
>>
>> allowedTimestampSkew is used for something different, it is used when you
>> reassign timestamps to elements which already have timestamps (e.g.
>> assigned by source) and you want to move them into past. The skew says how
>> far in the past you can go.
>>
>>
>> In both scenarios which I described earlier for *source transforms* is
>> it possible that the pipeline will drop data if I do not
>> specify allowedTimeStampSkew/ allowedLateness at the source
>> transforms(given I have late arriving data)? Can I just set allowed
>> lateness in the transform where I do groupBy or windowInto rather than
>> source.
>>
>> AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not
>> the source. The source emits _watermarks_, which marks progress in event
>> time, but the data is then handled in the stateful operator. Each operator
>> can have its own allowedLateness (although the model ensures that the
>> lateness is by default inherited from one operator to the other). Sources
>> should simply assign elements to global windows (with no allowed lateness,
>> as allowed lateness has no meaning for global windows as mentioned above).
>>
>>
>> In case of TextIO.read which reads from a bounded source and I assign
>> Timestamps to all elements in the second transform, will it be useful in
>> this case as well to set allowedTimeStampSkew after assigning timestamps? I
>> am trying to understand how the elements will be available after assigning
>> timestamps (Given all files are present on file system), will they be
>> ordered by timestamp, can some elements be read after watermark has
>> progressed above an element's event time  ?
>>
>> When executing batch pipeline, there is actually no watermark. Event time
>> moves discretely from -inf (computation not finished yet) to +inf
>> (computation finished). In the case you describe, you should not even need
>> to set allowedTimestampSkew, because elements output from TextIO should
>> (probably) be assigned timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE (I'm
>> not sure if the model guarantees this, but it seems reasonable). You can
>> then reassign timestamps to the future as you wish. You don't have to worry
>> about allowed lateness either, because that only applies to streaming
>> pipelines, where event time moves more smoothly. By the definition of how
>> event time progresses in case of batch pipelines, there is no "late" (after
>> watermark) data in this case.
>>
>
> Clarification: sources should assign elements to their upstream window
> (similar to DoFns), generally with the appropriate timestamp (unless they
> are timestamp aware). The upstream of a bounded source is typically
> Impulse, which is in the global window with MIN_TIMESTAMP, but could be
> different. This better unifies the case of reading the elements from a set
> of filenames published to pubsub, for example.
>
>>
>>
>> TextIO.Read.
>>      |. Bounded source
>>      |. Global Window
>>      |.  -infinity watermark
>> apply
>> WithTimeStamps (Based on a timestamp attribute in file)
>>    |.   timestamped elements (watermark starts from -infinity and follows
>> the timestamp from timestamp attribute)
>>    |.   Global Window
>>
>>
>> Regards,
>> Amit
>>
>> On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <go...@google.com> wrote:
>>
>>> Hi Amit,
>>>
>>> As you don't have any GroupByKey or trigger in your pipeline, you don't
>>> need to do allowed lateness.
>>> For unbounded source, Global window will never fire a trigger or emit
>>> GroupByKey.
>>> In the code you linked, a trigger is used which uses allowedLateness.
>>>
>>> Thanks,
>>> Ankur
>>>
>>> On Tue, Mar 31, 2020 at 11:20 AM amit kumar <ak...@gmail.com> wrote:
>>>
>>>> Thanks Jan!
>>>> I have a question based on this on Global Window and allowed lateness,
>>>> with default trigger for the following
>>>>  scenarios:
>>>>
>>>> Case 1-
>>>> TextIO.Read.
>>>>      |. Bounded source
>>>>      |. Global Window
>>>>      |.  -infinity watermark
>>>> apply
>>>> WithTimeStamps (Based on a timestamp attribute in file)
>>>>    |.   timestamped elements (watermark starts from -infinity and
>>>> follows the timestamp from timestamp attribute)
>>>>    |.   Global Window
>>>>    |. (Will I never need to do allowedLateness in this case with
>>>> default trigger? Will there be any benefit since the window is global and
>>>> watermark will pass the end of window when everything is processed ?  )
>>>>
>>>>
>>>> Case 2 -
>>>> KinesisIO.read
>>>>     | .Unbounded Source
>>>>     |. Default Global Window
>>>>     |. watermark based on arrival time
>>>>  apply
>>>> WithTimeStamps (Based on a timestamp attribute from the stream)
>>>>    |.   timestamped elements  ( watermark follows the timestamp from
>>>> timestamp attribute)
>>>>    |.   Global Window
>>>>    |. Watermark based on event timestamp.
>>>>    | Same question here will there be any benefit of using
>>>> allowedLateness since window is global ?
>>>>
>>>> In the code example below allowedLateness is used for global window ?
>>>>
>>>> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>>>>
>>>> Regards,
>>>> Amit
>>>>
>>>> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Amit,
>>>>>
>>>>> the window function applied by default is
>>>>> WindowingStrategy.globalDefault(), [1] - global window with zero
>>>>> allowed
>>>>> lateness.
>>>>>
>>>>> Cheers,
>>>>>
>>>>>   Jan
>>>>>
>>>>> [1]
>>>>>
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>>>>>
>>>>> On 3/31/20 10:22 AM, amit kumar wrote:
>>>>> > Hi All,
>>>>> >
>>>>> > Is there a default WindowFn that gets applied to elements of an
>>>>> > unbounded source.
>>>>> >
>>>>> > For example, if I have a Kinesis input source ,for which all
>>>>> elements
>>>>> > are timestamped with ArrivalTime, what will be the default windowing
>>>>> > applied to the output of read transform ?
>>>>> >
>>>>> > Is this runner dependent ?
>>>>> >
>>>>> > Regards,
>>>>> > Amit
>>>>>
>>>>

Re: Default WindowFn for Unbounded source

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Apr 1, 2020 at 12:53 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Amit,
>
> answers inline.
> On 4/1/20 12:23 AM, amit kumar wrote:
>
> Thanks Ankur for your reply.
>
> By default the allowed lateness for a global window is zero but we can
> also set  it to be non-zero which will be used in the downstream transforms
> where group by or window into with trigger is happening ?
>  (using allowedTimeStampSkew for unbounded sources/ sources which have
> timestamped elements).
>
> Setting allowedLateness for global window has no semantic meaning, because
> global window will be triggered (using default trigger) only at the end of
> input. Allowed lateness plays no role in that for global window.
>
> allowedTimestampSkew is used for something different, it is used when you
> reassign timestamps to elements which already have timestamps (e.g.
> assigned by source) and you want to move them into past. The skew says how
> far in the past you can go.
>
>
> In both scenarios which I described earlier for *source transforms* is it
> possible that the pipeline will drop data if I do not
> specify allowedTimeStampSkew/ allowedLateness at the source
> transforms(given I have late arriving data)? Can I just set allowed
> lateness in the transform where I do groupBy or windowInto rather than
> source.
>
> AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not
> the source. The source emits _watermarks_, which marks progress in event
> time, but the data is then handled in the stateful operator. Each operator
> can have its own allowedLateness (although the model ensures that the
> lateness is by default inherited from one operator to the other). Sources
> should simply assign elements to global windows (with no allowed lateness,
> as allowed lateness has no meaning for global windows as mentioned above).
>
>
> In case of TextIO.read which reads from a bounded source and I assign
> Timestamps to all elements in the second transform, will it be useful in
> this case as well to set allowedTimeStampSkew after assigning timestamps? I
> am trying to understand how the elements will be available after assigning
> timestamps (Given all files are present on file system), will they be
> ordered by timestamp, can some elements be read after watermark has
> progressed above an element's event time  ?
>
> When executing batch pipeline, there is actually no watermark. Event time
> moves discretely from -inf (computation not finished yet) to +inf
> (computation finished). In the case you describe, you should not even need
> to set allowedTimestampSkew, because elements output from TextIO should
> (probably) be assigned timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE (I'm
> not sure if the model guarantees this, but it seems reasonable). You can
> then reassign timestamps to the future as you wish. You don't have to worry
> about allowed lateness either, because that only applies to streaming
> pipelines, where event time moves more smoothly. By the definition of how
> event time progresses in case of batch pipelines, there is no "late" (after
> watermark) data in this case.
>

Clarification: sources should assign elements to their upstream window
(similar to DoFns), generally with the appropriate timestamp (unless they
are timestamp aware). The upstream of a bounded source is typically
Impulse, which is in the global window with MIN_TIMESTAMP, but could be
different. This better unifies the case of reading the elements from a set
of filenames published to pubsub, for example.

>
>
> TextIO.Read.
>      |. Bounded source
>      |. Global Window
>      |.  -infinity watermark
> apply
> WithTimeStamps (Based on a timestamp attribute in file)
>    |.   timestamped elements (watermark starts from -infinity and follows
> the timestamp from timestamp attribute)
>    |.   Global Window
>
>
> Regards,
> Amit
>
> On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <go...@google.com> wrote:
>
>> Hi Amit,
>>
>> As you don't have any GroupByKey or trigger in your pipeline, you don't
>> need to do allowed lateness.
>> For unbounded source, Global window will never fire a trigger or emit
>> GroupByKey.
>> In the code you linked, a trigger is used which uses allowedLateness.
>>
>> Thanks,
>> Ankur
>>
>> On Tue, Mar 31, 2020 at 11:20 AM amit kumar <ak...@gmail.com> wrote:
>>
>>> Thanks Jan!
>>> I have a question based on this on Global Window and allowed lateness,
>>> with default trigger for the following
>>>  scenarios:
>>>
>>> Case 1-
>>> TextIO.Read.
>>>      |. Bounded source
>>>      |. Global Window
>>>      |.  -infinity watermark
>>> apply
>>> WithTimeStamps (Based on a timestamp attribute in file)
>>>    |.   timestamped elements (watermark starts from -infinity and
>>> follows the timestamp from timestamp attribute)
>>>    |.   Global Window
>>>    |. (Will I never need to do allowedLateness in this case with
>>> default trigger? Will there be any benefit since the window is global and
>>> watermark will pass the end of window when everything is processed ?  )
>>>
>>>
>>> Case 2 -
>>> KinesisIO.read
>>>     | .Unbounded Source
>>>     |. Default Global Window
>>>     |. watermark based on arrival time
>>>  apply
>>> WithTimeStamps (Based on a timestamp attribute from the stream)
>>>    |.   timestamped elements  ( watermark follows the timestamp from
>>> timestamp attribute)
>>>    |.   Global Window
>>>    |. Watermark based on event timestamp.
>>>    | Same question here will there be any benefit of using
>>> allowedLateness since window is global ?
>>>
>>> In the code example below allowedLateness is used for global window ?
>>>
>>> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>>>
>>> Regards,
>>> Amit
>>>
>>> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Amit,
>>>>
>>>> the window function applied by default is
>>>> WindowingStrategy.globalDefault(), [1] - global window with zero
>>>> allowed
>>>> lateness.
>>>>
>>>> Cheers,
>>>>
>>>>   Jan
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>>>>
>>>> On 3/31/20 10:22 AM, amit kumar wrote:
>>>> > Hi All,
>>>> >
>>>> > Is there a default WindowFn that gets applied to elements of an
>>>> > unbounded source.
>>>> >
>>>> > For example, if I have a Kinesis input source ,for which all elements
>>>> > are timestamped with ArrivalTime, what will be the default windowing
>>>> > applied to the output of read transform ?
>>>> >
>>>> > Is this runner dependent ?
>>>> >
>>>> > Regards,
>>>> > Amit
>>>>
>>>

Re: Default WindowFn for Unbounded source

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Amit,

answers inline.

On 4/1/20 12:23 AM, amit kumar wrote:
> Thanks Ankur for your reply.
>
> By default the allowed lateness for a global window is zero but we can 
> also set  it to be non-zero which will be used in the downstream 
> transforms where group by or window into with trigger is happening ?
>  (using allowedTimeStampSkew for unbounded sources/ sources which have 
> timestamped elements).

Setting allowedLateness for global window has no semantic meaning, 
because global window will be triggered (using default trigger) only at 
the end of input. Allowed lateness plays no role in that for global window.

allowedTimestampSkew is used for something different, it is used when 
you reassign timestamps to elements which already have timestamps (e.g. 
assigned by source) and you want to move them into past. The skew says 
how far in the past you can go.

>
> In both scenarios which I described earlier for *source transforms* is 
> it possible that the pipeline will drop data if I do not 
> specify allowedTimeStampSkew/ allowedLateness at the source 
> transforms(given I have late arriving data)? Can I just set allowed 
> lateness in the transform where I do groupBy or windowInto rather than 
> source.
AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not 
the source. The source emits _watermarks_, which marks progress in event 
time, but the data is then handled in the stateful operator. Each 
operator can have its own allowedLateness (although the model ensures 
that the lateness is by default inherited from one operator to the 
other). Sources should simply assign elements to global windows (with no 
allowed lateness, as allowed lateness has no meaning for global windows 
as mentioned above).
>
> In case of TextIO.read which reads from a bounded source and I assign 
> Timestamps to all elements in the second transform, will it be useful 
> in this case as well to set allowedTimeStampSkew after assigning 
> timestamps? I am trying to understand how the elements will be 
> available after assigning timestamps (Given all files are present on 
> file system), will they be ordered by timestamp, can some elements be 
> read after watermark has progressed above an element's event time  ?
When executing batch pipeline, there is actually no watermark. Event 
time moves discretely from -inf (computation not finished yet) to +inf 
(computation finished). In the case you describe, you should not even 
need to set allowedTimestampSkew, because elements output from TextIO 
should (probably) be assigned timestamp of 
BoundedWindow.TIMESTAMP_MIN_VALUE (I'm not sure if the model guarantees 
this, but it seems reasonable). You can then reassign timestamps to the 
future as you wish. You don't have to worry about allowed lateness 
either, because that only applies to streaming pipelines, where event 
time moves more smoothly. By the definition of how event time progresses 
in case of batch pipelines, there is no "late" (after watermark) data in 
this case.
>
>
> TextIO.Read.
>      |. Bounded source
>      |. Global Window
>      |.  -infinity watermark
> apply
> WithTimeStamps (Based on a timestamp attribute in file)
>    |.   timestamped elements (watermark starts from -infinity and 
> follows the timestamp from timestamp attribute)
>    |.   Global Window
>
>
> Regards,
> Amit
>
> On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <goenka@google.com 
> <ma...@google.com>> wrote:
>
>     Hi Amit,
>
>     As you don't have any GroupByKey or trigger in your pipeline, you
>     don't need to do allowed lateness.
>     For unbounded source, Global window will never fire a trigger or
>     emit GroupByKey.
>     In the code you linked, a trigger is used which uses allowedLateness.
>
>     Thanks,
>     Ankur
>
>     On Tue, Mar 31, 2020 at 11:20 AM amit kumar <akdataguy@gmail.com
>     <ma...@gmail.com>> wrote:
>
>         Thanks Jan!
>         I have a question based on this on Global Window and allowed
>         lateness, with default trigger for the following
>          scenarios:
>
>         Case 1-
>         TextIO.Read.
>              |. Bounded source
>              |. Global Window
>              |.  -infinity watermark
>         apply
>         WithTimeStamps (Based on a timestamp attribute in file)
>            |.   timestamped elements (watermark starts from -infinity
>         and follows the timestamp from timestamp attribute)
>            |.   Global Window
>            |. (Will I never need to do allowedLateness in this case
>         with default trigger? Will there be any benefit since the
>         window is global and watermark will pass the end of window
>         when everything is processed ? )
>
>
>         Case 2 -
>         KinesisIO.read
>             | .Unbounded Source
>             |. Default Global Window
>             |. watermark based on arrival time
>          apply
>         WithTimeStamps (Based on a timestamp attribute from the stream)
>            |.   timestamped elements  ( watermark follows the
>         timestamp from timestamp attribute)
>            |.   Global Window
>            |. Watermark based on event timestamp.
>            | Same question here will there be any benefit of using
>         allowedLateness since window is global ?
>
>         In the code example below allowedLateness is used for global
>         window ?
>         https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>
>         Regards,
>         Amit
>
>         On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je.ik@seznam.cz
>         <ma...@seznam.cz>> wrote:
>
>             Hi Amit,
>
>             the window function applied by default is
>             WindowingStrategy.globalDefault(), [1] - global window
>             with zero allowed
>             lateness.
>
>             Cheers,
>
>               Jan
>
>             [1]
>             https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>
>             On 3/31/20 10:22 AM, amit kumar wrote:
>             > Hi All,
>             >
>             > Is there a default WindowFn that gets applied to
>             elements of an
>             > unbounded source.
>             >
>             > For example, if I have a Kinesis input source ,for which
>             all elements
>             > are timestamped with ArrivalTime, what will be the
>             default windowing
>             > applied to the output of read transform ?
>             >
>             > Is this runner dependent ?
>             >
>             > Regards,
>             > Amit
>

Re: Default WindowFn for Unbounded source

Posted by amit kumar <ak...@gmail.com>.
Thanks Ankur for your reply.

By default the allowed lateness for a global window is zero but we can also
set  it to be non-zero which will be used in the downstream transforms
where group by or window into with trigger is happening ?
 (using allowedTimeStampSkew for unbounded sources/ sources which have
timestamped elements).

In both scenarios which I described earlier for *source transforms* is it
possible that the pipeline will drop data if I do not
specify allowedTimeStampSkew/ allowedLateness at the source
transforms(given I have late arriving data)? Can I just set allowed
lateness in the transform where I do groupBy or windowInto rather than
source.

In case of TextIO.read which reads from a bounded source and I assign
Timestamps to all elements in the second transform, will it be useful in
this case as well to set allowedTimeStampSkew after assigning timestamps? I
am trying to understand how the elements will be available after assigning
timestamps (Given all files are present on file system), will they be
ordered by timestamp, can some elements be read after watermark has
progressed above an element's event time  ?


TextIO.Read.
     |. Bounded source
     |. Global Window
     |.  -infinity watermark
apply
WithTimeStamps (Based on a timestamp attribute in file)
   |.   timestamped elements (watermark starts from -infinity and follows
the timestamp from timestamp attribute)
   |.   Global Window


Regards,
Amit

On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <go...@google.com> wrote:

> Hi Amit,
>
> As you don't have any GroupByKey or trigger in your pipeline, you don't
> need to do allowed lateness.
> For unbounded source, Global window will never fire a trigger or emit
> GroupByKey.
> In the code you linked, a trigger is used which uses allowedLateness.
>
> Thanks,
> Ankur
>
> On Tue, Mar 31, 2020 at 11:20 AM amit kumar <ak...@gmail.com> wrote:
>
>> Thanks Jan!
>> I have a question based on this on Global Window and allowed lateness,
>> with default trigger for the following
>>  scenarios:
>>
>> Case 1-
>> TextIO.Read.
>>      |. Bounded source
>>      |. Global Window
>>      |.  -infinity watermark
>> apply
>> WithTimeStamps (Based on a timestamp attribute in file)
>>    |.   timestamped elements (watermark starts from -infinity and follows
>> the timestamp from timestamp attribute)
>>    |.   Global Window
>>    |. (Will I never need to do allowedLateness in this case with default
>> trigger? Will there be any benefit since the window is global and watermark
>> will pass the end of window when everything is processed ?  )
>>
>>
>> Case 2 -
>> KinesisIO.read
>>     | .Unbounded Source
>>     |. Default Global Window
>>     |. watermark based on arrival time
>>  apply
>> WithTimeStamps (Based on a timestamp attribute from the stream)
>>    |.   timestamped elements  ( watermark follows the timestamp from
>> timestamp attribute)
>>    |.   Global Window
>>    |. Watermark based on event timestamp.
>>    | Same question here will there be any benefit of using
>> allowedLateness since window is global ?
>>
>> In the code example below allowedLateness is used for global window ?
>>
>> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>>
>> Regards,
>> Amit
>>
>> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Amit,
>>>
>>> the window function applied by default is
>>> WindowingStrategy.globalDefault(), [1] - global window with zero allowed
>>> lateness.
>>>
>>> Cheers,
>>>
>>>   Jan
>>>
>>> [1]
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>>>
>>> On 3/31/20 10:22 AM, amit kumar wrote:
>>> > Hi All,
>>> >
>>> > Is there a default WindowFn that gets applied to elements of an
>>> > unbounded source.
>>> >
>>> > For example, if I have a Kinesis input source ,for which all elements
>>> > are timestamped with ArrivalTime, what will be the default windowing
>>> > applied to the output of read transform ?
>>> >
>>> > Is this runner dependent ?
>>> >
>>> > Regards,
>>> > Amit
>>>
>>

Re: Default WindowFn for Unbounded source

Posted by Ankur Goenka <go...@google.com>.
Hi Amit,

As you don't have any GroupByKey or trigger in your pipeline, you don't
need to do allowed lateness.
For unbounded source, Global window will never fire a trigger or emit
GroupByKey.
In the code you linked, a trigger is used which uses allowedLateness.

Thanks,
Ankur

On Tue, Mar 31, 2020 at 11:20 AM amit kumar <ak...@gmail.com> wrote:

> Thanks Jan!
> I have a question based on this on Global Window and allowed lateness,
> with default trigger for the following
>  scenarios:
>
> Case 1-
> TextIO.Read.
>      |. Bounded source
>      |. Global Window
>      |.  -infinity watermark
> apply
> WithTimeStamps (Based on a timestamp attribute in file)
>    |.   timestamped elements (watermark starts from -infinity and follows
> the timestamp from timestamp attribute)
>    |.   Global Window
>    |. (Will I never need to do allowedLateness in this case with default
> trigger? Will there be any benefit since the window is global and watermark
> will pass the end of window when everything is processed ?  )
>
>
> Case 2 -
> KinesisIO.read
>     | .Unbounded Source
>     |. Default Global Window
>     |. watermark based on arrival time
>  apply
> WithTimeStamps (Based on a timestamp attribute from the stream)
>    |.   timestamped elements  ( watermark follows the timestamp from
> timestamp attribute)
>    |.   Global Window
>    |. Watermark based on event timestamp.
>    | Same question here will there be any benefit of using
> allowedLateness since window is global ?
>
> In the code example below allowedLateness is used for global window ?
>
> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>
> Regards,
> Amit
>
> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Amit,
>>
>> the window function applied by default is
>> WindowingStrategy.globalDefault(), [1] - global window with zero allowed
>> lateness.
>>
>> Cheers,
>>
>>   Jan
>>
>> [1]
>>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>>
>> On 3/31/20 10:22 AM, amit kumar wrote:
>> > Hi All,
>> >
>> > Is there a default WindowFn that gets applied to elements of an
>> > unbounded source.
>> >
>> > For example, if I have a Kinesis input source ,for which all elements
>> > are timestamped with ArrivalTime, what will be the default windowing
>> > applied to the output of read transform ?
>> >
>> > Is this runner dependent ?
>> >
>> > Regards,
>> > Amit
>>
>

Re: Default WindowFn for Unbounded source

Posted by amit kumar <ak...@gmail.com>.
Thanks Jan!
I have a question based on this on Global Window and allowed lateness, with
default trigger for the following
 scenarios:

Case 1-
TextIO.Read.
     |. Bounded source
     |. Global Window
     |.  -infinity watermark
apply
WithTimeStamps (Based on a timestamp attribute in file)
   |.   timestamped elements (watermark starts from -infinity and follows
the timestamp from timestamp attribute)
   |.   Global Window
   |. (Will I never need to do allowedLateness in this case with default
trigger? Will there be any benefit since the window is global and watermark
will pass the end of window when everything is processed ?  )


Case 2 -
KinesisIO.read
    | .Unbounded Source
    |. Default Global Window
    |. watermark based on arrival time
 apply
WithTimeStamps (Based on a timestamp attribute from the stream)
   |.   timestamped elements  ( watermark follows the timestamp from
timestamp attribute)
   |.   Global Window
   |. Watermark based on event timestamp.
   | Same question here will there be any benefit of using allowedLateness
since window is global ?

In the code example below allowedLateness is used for global window ?
https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307

Regards,
Amit

On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Amit,
>
> the window function applied by default is
> WindowingStrategy.globalDefault(), [1] - global window with zero allowed
> lateness.
>
> Cheers,
>
>   Jan
>
> [1]
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>
> On 3/31/20 10:22 AM, amit kumar wrote:
> > Hi All,
> >
> > Is there a default WindowFn that gets applied to elements of an
> > unbounded source.
> >
> > For example, if I have a Kinesis input source ,for which all elements
> > are timestamped with ArrivalTime, what will be the default windowing
> > applied to the output of read transform ?
> >
> > Is this runner dependent ?
> >
> > Regards,
> > Amit
>

Re: Default WindowFn for Unbounded source

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Amit,

the window function applied by default is 
WindowingStrategy.globalDefault(), [1] - global window with zero allowed 
lateness.

Cheers,

  Jan

[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105

On 3/31/20 10:22 AM, amit kumar wrote:
> Hi All,
>
> Is there a default WindowFn that gets applied to elements of an 
> unbounded source.
>
> For example, if I have a Kinesis input source ,for which all elements 
> are timestamped with ArrivalTime, what will be the default windowing 
> applied to the output of read transform ?
>
> Is this runner dependent ?
>
> Regards,
> Amit