You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Carlos Alonso <ca...@mrcalonso.com> on 2018/02/07 22:55:13 UTC

Lateness droppings debugging

Hi everyone!!

I have a streaming job running with fixed windows of one hour and allowed
lateness of two days and the number of dropped due to lateness elements is
slowly, but continuously growing and I'd like to understand which elements
are those.

I'd like to get the watermark from inside the job to compare it against
each element and write log messages with the ones that will be potentially
discarded.... Does that approach make any sense? I which case... How can I
get the watermark from inside the job? Any other ideas?

Thanks in advance!!

Re: Lateness droppings debugging

Posted by Raghu Angadi <ra...@google.com>.
What are the advantages of holding back watermark over allowed lateness
(duration of both being approximately same)?

On Thu, Feb 8, 2018 at 1:38 PM, Robert Bradshaw <ro...@google.com> wrote:

> You can set the timestamp attribute of your pubsub messages which will
> hold back the watermark, see
>
> https://beam.apache.org/documentation/sdks/javadoc/2.
> 2.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#
> withTimestampAttribute-java.lang.String-
>
> However, if you're mixing historical and live data, it may make more
> sense to read these as two separate sources (e.g. the static data from
> a set of files, the live data from pubsub) and then flatten them for
> further processing.
>
> On Thu, Feb 8, 2018 at 1:23 PM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
> > Yes, the data is finite (although it comes through PubSub, so I guess is
> > considered unbounded).
> > How could I hold the watermark and prevent it from moving?
> >
> > Thanks!
> >
> > On Thu, Feb 8, 2018 at 10:06 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Where is the watermark for this old data coming from? Rather than
> >> messing with allowed lateness, would it be possible to hold the
> >> watermark back appropriately during the time you're injecting old data
> >> (assuming there's only a finite amount of it)?
> >>
> >> On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <ca...@mrcalonso.com>
> >> wrote:
> >> > Thanks for your responses!!
> >> >
> >> > I have a scenario where I have to reprocess very disordered data for 4
> >> > or 5
> >> > years and I don't want to lose any data. I'm thinking of setting a
> very
> >> > big
> >> > allowed lateness (5 years), but before doing that I'd like to
> understand
> >> > the
> >> > consequences that may have. I guess memory wise will be very consuming
> >> > as no
> >> > window will ever expire, but I guess I could overcome that with brute
> >> > force
> >> > (many machines with many RAM) but, are there more concerns I should be
> >> > aware
> >> > of? This should be a one-off thing.
> >> >
> >> > Thanks!
> >> >
> >> > On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <ra...@google.com>
> wrote:
> >> >>
> >> >> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
> >> >> <pa...@gmail.com> wrote:
> >> >>>
> >> >>> Hi Raghu,
> >> >>> Can you provide more details about increasing allowed lateness? Even
> >> >>> if I
> >> >>> do that I still need to compare event time of record with processing
> >> >>> time(system current time) in my ParDo?
> >> >>
> >> >>
> >> >> I see. PaneInfo() associated with each element has 'Timing' enum, so
> we
> >> >> can tell if the element is late, but it does not tell how late.
> >> >> How about this : We can have a periodic timer firing every minute and
> >> >> store the scheduled time of the timer in state as the watermark time.
> >> >> We
> >> >> could compare element time to this stored time for good approximation
> >> >> (may
> >> >> require parallel stage with global window, dropping any events that
> >> >> 'clearly
> >> >> within limits' based on current time). There are probably other ways
> to
> >> >> do
> >> >> this with timers within existing stage.
> >> >>
> >> >>>
> >> >>> Pawel
> >> >>>
> >> >>> On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com>
> wrote:
> >> >>>>
> >> >>>> The watermark is not directly available, you essentially infer from
> >> >>>> fired triggers (e.g. fixed windows). I would consider some of these
> >> >>>> options
> >> >>>> :
> >> >>>>   - Adhoc debugging : if the pipeline is close to realtime, you can
> >> >>>> just
> >> >>>> guess if a element will be dropped based on its timestamp and
> current
> >> >>>> time
> >> >>>> in the first stage (before first aggregation)
> >> >>>>   - Increase allowed lateness (say to 3 days) and drop the elements
> >> >>>> yourself you notice are later than 1 day.
> >> >>>>   - Place the elements into another window with larger allowed
> >> >>>> lateness
> >> >>>> and log very late elements in another parallel aggregation (see
> >> >>>> TriggerExample.java in Beam repo).
> >> >>>>
> >> >>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <
> carlos@mrcalonso.com>
> >> >>>> wrote:
> >> >>>>>
> >> >>>>> Hi everyone!!
> >> >>>>>
> >> >>>>> I have a streaming job running with fixed windows of one hour and
> >> >>>>> allowed lateness of two days and the number of dropped due to
> >> >>>>> lateness
> >> >>>>> elements is slowly, but continuously growing and I'd like to
> >> >>>>> understand
> >> >>>>> which elements are those.
> >> >>>>>
> >> >>>>> I'd like to get the watermark from inside the job to compare it
> >> >>>>> against
> >> >>>>> each element and write log messages with the ones that will be
> >> >>>>> potentially
> >> >>>>> discarded.... Does that approach make any sense? I which case...
> How
> >> >>>>> can I
> >> >>>>> get the watermark from inside the job? Any other ideas?
> >> >>>>>
> >> >>>>> Thanks in advance!!
> >> >>>>
> >> >>>>
> >> >>>
> >> >
>

Re: Lateness droppings debugging

Posted by Carlos Alonso <ca...@mrcalonso.com>.
We’re setting the timestamp to the date of last update of each document.
That’s why I think pushing everything to PubSub before starting the job
could work.

The data is read from CouchDB’s changes feed into PubSub and the idea is to
transfer everything into GCS since the very beginning. Aside from that we
have another job running “live” with live data.

Any other idea/suggestion?

Thanks!!
On Thu, 8 Feb 2018 at 22:39, Robert Bradshaw <ro...@google.com> wrote:

> You can set the timestamp attribute of your pubsub messages which will
> hold back the watermark, see
>
>
> https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-
>
> However, if you're mixing historical and live data, it may make more
> sense to read these as two separate sources (e.g. the static data from
> a set of files, the live data from pubsub) and then flatten them for
> further processing.
>
> On Thu, Feb 8, 2018 at 1:23 PM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
> > Yes, the data is finite (although it comes through PubSub, so I guess is
> > considered unbounded).
> > How could I hold the watermark and prevent it from moving?
> >
> > Thanks!
> >
> > On Thu, Feb 8, 2018 at 10:06 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Where is the watermark for this old data coming from? Rather than
> >> messing with allowed lateness, would it be possible to hold the
> >> watermark back appropriately during the time you're injecting old data
> >> (assuming there's only a finite amount of it)?
> >>
> >> On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <ca...@mrcalonso.com>
> >> wrote:
> >> > Thanks for your responses!!
> >> >
> >> > I have a scenario where I have to reprocess very disordered data for 4
> >> > or 5
> >> > years and I don't want to lose any data. I'm thinking of setting a
> very
> >> > big
> >> > allowed lateness (5 years), but before doing that I'd like to
> understand
> >> > the
> >> > consequences that may have. I guess memory wise will be very consuming
> >> > as no
> >> > window will ever expire, but I guess I could overcome that with brute
> >> > force
> >> > (many machines with many RAM) but, are there more concerns I should be
> >> > aware
> >> > of? This should be a one-off thing.
> >> >
> >> > Thanks!
> >> >
> >> > On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <ra...@google.com>
> wrote:
> >> >>
> >> >> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
> >> >> <pa...@gmail.com> wrote:
> >> >>>
> >> >>> Hi Raghu,
> >> >>> Can you provide more details about increasing allowed lateness? Even
> >> >>> if I
> >> >>> do that I still need to compare event time of record with processing
> >> >>> time(system current time) in my ParDo?
> >> >>
> >> >>
> >> >> I see. PaneInfo() associated with each element has 'Timing' enum, so
> we
> >> >> can tell if the element is late, but it does not tell how late.
> >> >> How about this : We can have a periodic timer firing every minute and
> >> >> store the scheduled time of the timer in state as the watermark time.
> >> >> We
> >> >> could compare element time to this stored time for good approximation
> >> >> (may
> >> >> require parallel stage with global window, dropping any events that
> >> >> 'clearly
> >> >> within limits' based on current time). There are probably other ways
> to
> >> >> do
> >> >> this with timers within existing stage.
> >> >>
> >> >>>
> >> >>> Pawel
> >> >>>
> >> >>> On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com>
> wrote:
> >> >>>>
> >> >>>> The watermark is not directly available, you essentially infer from
> >> >>>> fired triggers (e.g. fixed windows). I would consider some of these
> >> >>>> options
> >> >>>> :
> >> >>>>   - Adhoc debugging : if the pipeline is close to realtime, you can
> >> >>>> just
> >> >>>> guess if a element will be dropped based on its timestamp and
> current
> >> >>>> time
> >> >>>> in the first stage (before first aggregation)
> >> >>>>   - Increase allowed lateness (say to 3 days) and drop the elements
> >> >>>> yourself you notice are later than 1 day.
> >> >>>>   - Place the elements into another window with larger allowed
> >> >>>> lateness
> >> >>>> and log very late elements in another parallel aggregation (see
> >> >>>> TriggerExample.java in Beam repo).
> >> >>>>
> >> >>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <
> carlos@mrcalonso.com>
> >> >>>> wrote:
> >> >>>>>
> >> >>>>> Hi everyone!!
> >> >>>>>
> >> >>>>> I have a streaming job running with fixed windows of one hour and
> >> >>>>> allowed lateness of two days and the number of dropped due to
> >> >>>>> lateness
> >> >>>>> elements is slowly, but continuously growing and I'd like to
> >> >>>>> understand
> >> >>>>> which elements are those.
> >> >>>>>
> >> >>>>> I'd like to get the watermark from inside the job to compare it
> >> >>>>> against
> >> >>>>> each element and write log messages with the ones that will be
> >> >>>>> potentially
> >> >>>>> discarded.... Does that approach make any sense? I which case...
> How
> >> >>>>> can I
> >> >>>>> get the watermark from inside the job? Any other ideas?
> >> >>>>>
> >> >>>>> Thanks in advance!!
> >> >>>>
> >> >>>>
> >> >>>
> >> >
>

Re: Lateness droppings debugging

Posted by Robert Bradshaw <ro...@google.com>.
You can set the timestamp attribute of your pubsub messages which will
hold back the watermark, see

https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-

However, if you're mixing historical and live data, it may make more
sense to read these as two separate sources (e.g. the static data from
a set of files, the live data from pubsub) and then flatten them for
further processing.

On Thu, Feb 8, 2018 at 1:23 PM, Carlos Alonso <ca...@mrcalonso.com> wrote:
> Yes, the data is finite (although it comes through PubSub, so I guess is
> considered unbounded).
> How could I hold the watermark and prevent it from moving?
>
> Thanks!
>
> On Thu, Feb 8, 2018 at 10:06 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Where is the watermark for this old data coming from? Rather than
>> messing with allowed lateness, would it be possible to hold the
>> watermark back appropriately during the time you're injecting old data
>> (assuming there's only a finite amount of it)?
>>
>> On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>> > Thanks for your responses!!
>> >
>> > I have a scenario where I have to reprocess very disordered data for 4
>> > or 5
>> > years and I don't want to lose any data. I'm thinking of setting a very
>> > big
>> > allowed lateness (5 years), but before doing that I'd like to understand
>> > the
>> > consequences that may have. I guess memory wise will be very consuming
>> > as no
>> > window will ever expire, but I guess I could overcome that with brute
>> > force
>> > (many machines with many RAM) but, are there more concerns I should be
>> > aware
>> > of? This should be a one-off thing.
>> >
>> > Thanks!
>> >
>> > On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <ra...@google.com> wrote:
>> >>
>> >> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
>> >> <pa...@gmail.com> wrote:
>> >>>
>> >>> Hi Raghu,
>> >>> Can you provide more details about increasing allowed lateness? Even
>> >>> if I
>> >>> do that I still need to compare event time of record with processing
>> >>> time(system current time) in my ParDo?
>> >>
>> >>
>> >> I see. PaneInfo() associated with each element has 'Timing' enum, so we
>> >> can tell if the element is late, but it does not tell how late.
>> >> How about this : We can have a periodic timer firing every minute and
>> >> store the scheduled time of the timer in state as the watermark time.
>> >> We
>> >> could compare element time to this stored time for good approximation
>> >> (may
>> >> require parallel stage with global window, dropping any events that
>> >> 'clearly
>> >> within limits' based on current time). There are probably other ways to
>> >> do
>> >> this with timers within existing stage.
>> >>
>> >>>
>> >>> Pawel
>> >>>
>> >>> On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com> wrote:
>> >>>>
>> >>>> The watermark is not directly available, you essentially infer from
>> >>>> fired triggers (e.g. fixed windows). I would consider some of these
>> >>>> options
>> >>>> :
>> >>>>   - Adhoc debugging : if the pipeline is close to realtime, you can
>> >>>> just
>> >>>> guess if a element will be dropped based on its timestamp and current
>> >>>> time
>> >>>> in the first stage (before first aggregation)
>> >>>>   - Increase allowed lateness (say to 3 days) and drop the elements
>> >>>> yourself you notice are later than 1 day.
>> >>>>   - Place the elements into another window with larger allowed
>> >>>> lateness
>> >>>> and log very late elements in another parallel aggregation (see
>> >>>> TriggerExample.java in Beam repo).
>> >>>>
>> >>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi everyone!!
>> >>>>>
>> >>>>> I have a streaming job running with fixed windows of one hour and
>> >>>>> allowed lateness of two days and the number of dropped due to
>> >>>>> lateness
>> >>>>> elements is slowly, but continuously growing and I'd like to
>> >>>>> understand
>> >>>>> which elements are those.
>> >>>>>
>> >>>>> I'd like to get the watermark from inside the job to compare it
>> >>>>> against
>> >>>>> each element and write log messages with the ones that will be
>> >>>>> potentially
>> >>>>> discarded.... Does that approach make any sense? I which case... How
>> >>>>> can I
>> >>>>> get the watermark from inside the job? Any other ideas?
>> >>>>>
>> >>>>> Thanks in advance!!
>> >>>>
>> >>>>
>> >>>
>> >

Re: Lateness droppings debugging

Posted by Carlos Alonso <ca...@mrcalonso.com>.
I thought of loading all of it into the PubSub subscription before starting
the job. That should work, right? Any better suggestion?
On Thu, 8 Feb 2018 at 22:23, Carlos Alonso <ca...@mrcalonso.com> wrote:

> Yes, the data is finite (although it comes through PubSub, so I guess is
> considered unbounded).
> How could I hold the watermark and prevent it from moving?
>
> Thanks!
>
> On Thu, Feb 8, 2018 at 10:06 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Where is the watermark for this old data coming from? Rather than
>> messing with allowed lateness, would it be possible to hold the
>> watermark back appropriately during the time you're injecting old data
>> (assuming there's only a finite amount of it)?
>>
>> On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>> > Thanks for your responses!!
>> >
>> > I have a scenario where I have to reprocess very disordered data for 4
>> or 5
>> > years and I don't want to lose any data. I'm thinking of setting a very
>> big
>> > allowed lateness (5 years), but before doing that I'd like to
>> understand the
>> > consequences that may have. I guess memory wise will be very consuming
>> as no
>> > window will ever expire, but I guess I could overcome that with brute
>> force
>> > (many machines with many RAM) but, are there more concerns I should be
>> aware
>> > of? This should be a one-off thing.
>> >
>> > Thanks!
>> >
>> > On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <ra...@google.com> wrote:
>> >>
>> >> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
>> >> <pa...@gmail.com> wrote:
>> >>>
>> >>> Hi Raghu,
>> >>> Can you provide more details about increasing allowed lateness? Even
>> if I
>> >>> do that I still need to compare event time of record with processing
>> >>> time(system current time) in my ParDo?
>> >>
>> >>
>> >> I see. PaneInfo() associated with each element has 'Timing' enum, so we
>> >> can tell if the element is late, but it does not tell how late.
>> >> How about this : We can have a periodic timer firing every minute and
>> >> store the scheduled time of the timer in state as the watermark time.
>> We
>> >> could compare element time to this stored time for good approximation
>> (may
>> >> require parallel stage with global window, dropping any events that
>> 'clearly
>> >> within limits' based on current time). There are probably other ways
>> to do
>> >> this with timers within existing stage.
>> >>
>> >>>
>> >>> Pawel
>> >>>
>> >>> On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com> wrote:
>> >>>>
>> >>>> The watermark is not directly available, you essentially infer from
>> >>>> fired triggers (e.g. fixed windows). I would consider some of these
>> options
>> >>>> :
>> >>>>   - Adhoc debugging : if the pipeline is close to realtime, you can
>> just
>> >>>> guess if a element will be dropped based on its timestamp and
>> current time
>> >>>> in the first stage (before first aggregation)
>> >>>>   - Increase allowed lateness (say to 3 days) and drop the elements
>> >>>> yourself you notice are later than 1 day.
>> >>>>   - Place the elements into another window with larger allowed
>> lateness
>> >>>> and log very late elements in another parallel aggregation (see
>> >>>> TriggerExample.java in Beam repo).
>> >>>>
>> >>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi everyone!!
>> >>>>>
>> >>>>> I have a streaming job running with fixed windows of one hour and
>> >>>>> allowed lateness of two days and the number of dropped due to
>> lateness
>> >>>>> elements is slowly, but continuously growing and I'd like to
>> understand
>> >>>>> which elements are those.
>> >>>>>
>> >>>>> I'd like to get the watermark from inside the job to compare it
>> against
>> >>>>> each element and write log messages with the ones that will be
>> potentially
>> >>>>> discarded.... Does that approach make any sense? I which case...
>> How can I
>> >>>>> get the watermark from inside the job? Any other ideas?
>> >>>>>
>> >>>>> Thanks in advance!!
>> >>>>
>> >>>>
>> >>>
>> >
>>
>

Re: Lateness droppings debugging

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Yes, the data is finite (although it comes through PubSub, so I guess is
considered unbounded).
How could I hold the watermark and prevent it from moving?

Thanks!

On Thu, Feb 8, 2018 at 10:06 PM Robert Bradshaw <ro...@google.com> wrote:

> Where is the watermark for this old data coming from? Rather than
> messing with allowed lateness, would it be possible to hold the
> watermark back appropriately during the time you're injecting old data
> (assuming there's only a finite amount of it)?
>
> On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
> > Thanks for your responses!!
> >
> > I have a scenario where I have to reprocess very disordered data for 4
> or 5
> > years and I don't want to lose any data. I'm thinking of setting a very
> big
> > allowed lateness (5 years), but before doing that I'd like to understand
> the
> > consequences that may have. I guess memory wise will be very consuming
> as no
> > window will ever expire, but I guess I could overcome that with brute
> force
> > (many machines with many RAM) but, are there more concerns I should be
> aware
> > of? This should be a one-off thing.
> >
> > Thanks!
> >
> > On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <ra...@google.com> wrote:
> >>
> >> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
> >> <pa...@gmail.com> wrote:
> >>>
> >>> Hi Raghu,
> >>> Can you provide more details about increasing allowed lateness? Even
> if I
> >>> do that I still need to compare event time of record with processing
> >>> time(system current time) in my ParDo?
> >>
> >>
> >> I see. PaneInfo() associated with each element has 'Timing' enum, so we
> >> can tell if the element is late, but it does not tell how late.
> >> How about this : We can have a periodic timer firing every minute and
> >> store the scheduled time of the timer in state as the watermark time. We
> >> could compare element time to this stored time for good approximation
> (may
> >> require parallel stage with global window, dropping any events that
> 'clearly
> >> within limits' based on current time). There are probably other ways to
> do
> >> this with timers within existing stage.
> >>
> >>>
> >>> Pawel
> >>>
> >>> On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com> wrote:
> >>>>
> >>>> The watermark is not directly available, you essentially infer from
> >>>> fired triggers (e.g. fixed windows). I would consider some of these
> options
> >>>> :
> >>>>   - Adhoc debugging : if the pipeline is close to realtime, you can
> just
> >>>> guess if a element will be dropped based on its timestamp and current
> time
> >>>> in the first stage (before first aggregation)
> >>>>   - Increase allowed lateness (say to 3 days) and drop the elements
> >>>> yourself you notice are later than 1 day.
> >>>>   - Place the elements into another window with larger allowed
> lateness
> >>>> and log very late elements in another parallel aggregation (see
> >>>> TriggerExample.java in Beam repo).
> >>>>
> >>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
> >>>> wrote:
> >>>>>
> >>>>> Hi everyone!!
> >>>>>
> >>>>> I have a streaming job running with fixed windows of one hour and
> >>>>> allowed lateness of two days and the number of dropped due to
> lateness
> >>>>> elements is slowly, but continuously growing and I'd like to
> understand
> >>>>> which elements are those.
> >>>>>
> >>>>> I'd like to get the watermark from inside the job to compare it
> against
> >>>>> each element and write log messages with the ones that will be
> potentially
> >>>>> discarded.... Does that approach make any sense? I which case... How
> can I
> >>>>> get the watermark from inside the job? Any other ideas?
> >>>>>
> >>>>> Thanks in advance!!
> >>>>
> >>>>
> >>>
> >
>

Re: Lateness droppings debugging

Posted by Robert Bradshaw <ro...@google.com>.
Where is the watermark for this old data coming from? Rather than
messing with allowed lateness, would it be possible to hold the
watermark back appropriately during the time you're injecting old data
(assuming there's only a finite amount of it)?

On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <ca...@mrcalonso.com> wrote:
> Thanks for your responses!!
>
> I have a scenario where I have to reprocess very disordered data for 4 or 5
> years and I don't want to lose any data. I'm thinking of setting a very big
> allowed lateness (5 years), but before doing that I'd like to understand the
> consequences that may have. I guess memory wise will be very consuming as no
> window will ever expire, but I guess I could overcome that with brute force
> (many machines with many RAM) but, are there more concerns I should be aware
> of? This should be a one-off thing.
>
> Thanks!
>
> On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <ra...@google.com> wrote:
>>
>> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
>> <pa...@gmail.com> wrote:
>>>
>>> Hi Raghu,
>>> Can you provide more details about increasing allowed lateness? Even if I
>>> do that I still need to compare event time of record with processing
>>> time(system current time) in my ParDo?
>>
>>
>> I see. PaneInfo() associated with each element has 'Timing' enum, so we
>> can tell if the element is late, but it does not tell how late.
>> How about this : We can have a periodic timer firing every minute and
>> store the scheduled time of the timer in state as the watermark time. We
>> could compare element time to this stored time for good approximation (may
>> require parallel stage with global window, dropping any events that 'clearly
>> within limits' based on current time). There are probably other ways to do
>> this with timers within existing stage.
>>
>>>
>>> Pawel
>>>
>>> On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com> wrote:
>>>>
>>>> The watermark is not directly available, you essentially infer from
>>>> fired triggers (e.g. fixed windows). I would consider some of these options
>>>> :
>>>>   - Adhoc debugging : if the pipeline is close to realtime, you can just
>>>> guess if a element will be dropped based on its timestamp and current time
>>>> in the first stage (before first aggregation)
>>>>   - Increase allowed lateness (say to 3 days) and drop the elements
>>>> yourself you notice are later than 1 day.
>>>>   - Place the elements into another window with larger allowed lateness
>>>> and log very late elements in another parallel aggregation (see
>>>> TriggerExample.java in Beam repo).
>>>>
>>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>>
>>>>> Hi everyone!!
>>>>>
>>>>> I have a streaming job running with fixed windows of one hour and
>>>>> allowed lateness of two days and the number of dropped due to lateness
>>>>> elements is slowly, but continuously growing and I'd like to understand
>>>>> which elements are those.
>>>>>
>>>>> I'd like to get the watermark from inside the job to compare it against
>>>>> each element and write log messages with the ones that will be potentially
>>>>> discarded.... Does that approach make any sense? I which case... How can I
>>>>> get the watermark from inside the job? Any other ideas?
>>>>>
>>>>> Thanks in advance!!
>>>>
>>>>
>>>
>

Re: Lateness droppings debugging

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Thanks for your responses!!

I have a scenario where I have to reprocess very disordered data for 4 or 5
years and I don't want to lose any data. I'm thinking of setting a very big
allowed lateness (5 years), but before doing that I'd like to understand
the consequences that may have. I guess memory wise will be very consuming
as no window will ever expire, but I guess I could overcome that with brute
force (many machines with many RAM) but, are there more concerns I should
be aware of? This should be a one-off thing.

Thanks!

On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <ra...@google.com> wrote:

> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek <
> pawelbartoszek89@gmail.com> wrote:
>
>> Hi Raghu,
>> Can you provide more details about increasing allowed lateness? Even if I
>> do that I still need to compare event time of record with processing
>> time(system current time) in my ParDo?
>>
>
> I see. PaneInfo() associated with each element has 'Timing' enum, so we
> can tell if the element is late, but it does not tell how late.
> How about this : We can have a periodic timer firing every minute and
> store the scheduled time of the timer in state as the watermark time. We
> could compare element time to this stored time for good approximation (may
> require parallel stage with global window, dropping any events that
> 'clearly within limits' based on current time). There are probably other
> ways to do this with timers within existing stage.
>
>
>> Pawel
>>
>> On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com> wrote:
>>
>>> The watermark is not directly available, you essentially infer from
>>> fired triggers (e.g. fixed windows). I would consider some of these options
>>> :
>>>   - Adhoc debugging : if the pipeline is close to realtime, you can just
>>> guess if a element will be dropped based on its timestamp and current time
>>> in the first stage (before first aggregation)
>>>   - Increase allowed lateness (say to 3 days) and drop the elements
>>> yourself you notice are later than 1 day.
>>>   - Place the elements into another window with larger allowed lateness
>>> and log very late elements in another parallel aggregation (see
>>> TriggerExample.java in Beam repo).
>>>
>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi everyone!!
>>>>
>>>> I have a streaming job running with fixed windows of one hour and
>>>> allowed lateness of two days and the number of dropped due to lateness
>>>> elements is slowly, but continuously growing and I'd like to understand
>>>> which elements are those.
>>>>
>>>> I'd like to get the watermark from inside the job to compare it against
>>>> each element and write log messages with the ones that will be potentially
>>>> discarded.... Does that approach make any sense? I which case... How can I
>>>> get the watermark from inside the job? Any other ideas?
>>>>
>>>> Thanks in advance!!
>>>>
>>>
>>>
>>

Re: Lateness droppings debugging

Posted by Raghu Angadi <ra...@google.com>.
On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek <pawelbartoszek89@gmail.com
> wrote:

> Hi Raghu,
> Can you provide more details about increasing allowed lateness? Even if I
> do that I still need to compare event time of record with processing
> time(system current time) in my ParDo?
>

I see. PaneInfo() associated with each element has 'Timing' enum, so we can
tell if the element is late, but it does not tell how late.
How about this : We can have a periodic timer firing every minute and store
the scheduled time of the timer in state as the watermark time. We could
compare element time to this stored time for good approximation (may
require parallel stage with global window, dropping any events that
'clearly within limits' based on current time). There are probably other
ways to do this with timers within existing stage.


> Pawel
>
> On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com> wrote:
>
>> The watermark is not directly available, you essentially infer from fired
>> triggers (e.g. fixed windows). I would consider some of these options :
>>   - Adhoc debugging : if the pipeline is close to realtime, you can just
>> guess if a element will be dropped based on its timestamp and current time
>> in the first stage (before first aggregation)
>>   - Increase allowed lateness (say to 3 days) and drop the elements
>> yourself you notice are later than 1 day.
>>   - Place the elements into another window with larger allowed lateness
>> and log very late elements in another parallel aggregation (see
>> TriggerExample.java in Beam repo).
>>
>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> Hi everyone!!
>>>
>>> I have a streaming job running with fixed windows of one hour and
>>> allowed lateness of two days and the number of dropped due to lateness
>>> elements is slowly, but continuously growing and I'd like to understand
>>> which elements are those.
>>>
>>> I'd like to get the watermark from inside the job to compare it against
>>> each element and write log messages with the ones that will be potentially
>>> discarded.... Does that approach make any sense? I which case... How can I
>>> get the watermark from inside the job? Any other ideas?
>>>
>>> Thanks in advance!!
>>>
>>
>>
>

Re: Lateness droppings debugging

Posted by Pawel Bartoszek <pa...@gmail.com>.
Hi Raghu,
Can you provide more details about increasing allowed lateness? Even if I
do that I still need to compare event time of record with processing
time(system current time) in my ParDo?

Pawel

On 8 February 2018 at 05:40, Raghu Angadi <ra...@google.com> wrote:

> The watermark is not directly available, you essentially infer from fired
> triggers (e.g. fixed windows). I would consider some of these options :
>   - Adhoc debugging : if the pipeline is close to realtime, you can just
> guess if a element will be dropped based on its timestamp and current time
> in the first stage (before first aggregation)
>   - Increase allowed lateness (say to 3 days) and drop the elements
> yourself you notice are later than 1 day.
>   - Place the elements into another window with larger allowed lateness
> and log very late elements in another parallel aggregation (see
> TriggerExample.java in Beam repo).
>
> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> Hi everyone!!
>>
>> I have a streaming job running with fixed windows of one hour and allowed
>> lateness of two days and the number of dropped due to lateness elements is
>> slowly, but continuously growing and I'd like to understand which elements
>> are those.
>>
>> I'd like to get the watermark from inside the job to compare it against
>> each element and write log messages with the ones that will be potentially
>> discarded.... Does that approach make any sense? I which case... How can I
>> get the watermark from inside the job? Any other ideas?
>>
>> Thanks in advance!!
>>
>
>

Re: Lateness droppings debugging

Posted by Kenneth Knowles <kl...@google.com>.
Hi Carlos,

I wanted to mention that elements are not discarded by comparison of their
timestamp with the watermark. They are discarded when their window has
expired - when the watermark is passed the end of the window plus allowed
lateness. An element just has to that arrive for aggregation - such as GBK
or Combine or stateful ParDo - before the window is expired. This drops a
lot less data and is a bit less arbitrary than comparing element timestamp
versus watermmark.

I wish I had a better answer for how to gain insights into the data being
dropped. That's something we should think about.

Kenn

On Wed, Feb 7, 2018 at 10:30 PM, Carlos Alonso <ca...@mrcalonso.com> wrote:

> Cool, I'll try some of these. Thanks Raghu!
>
> On Thu, Feb 8, 2018 at 6:40 AM Raghu Angadi <ra...@google.com> wrote:
>
>> The watermark is not directly available, you essentially infer from fired
>> triggers (e.g. fixed windows). I would consider some of these options :
>>   - Adhoc debugging : if the pipeline is close to realtime, you can just
>> guess if a element will be dropped based on its timestamp and current time
>> in the first stage (before first aggregation)
>>   - Increase allowed lateness (say to 3 days) and drop the elements
>> yourself you notice are later than 1 day.
>>   - Place the elements into another window with larger allowed lateness
>> and log very late elements in another parallel aggregation (see
>> TriggerExample.java in Beam repo).
>>
>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> Hi everyone!!
>>>
>>> I have a streaming job running with fixed windows of one hour and
>>> allowed lateness of two days and the number of dropped due to lateness
>>> elements is slowly, but continuously growing and I'd like to understand
>>> which elements are those.
>>>
>>> I'd like to get the watermark from inside the job to compare it against
>>> each element and write log messages with the ones that will be potentially
>>> discarded.... Does that approach make any sense? I which case... How can I
>>> get the watermark from inside the job? Any other ideas?
>>>
>>> Thanks in advance!!
>>>
>>
>>

Re: Lateness droppings debugging

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Cool, I'll try some of these. Thanks Raghu!

On Thu, Feb 8, 2018 at 6:40 AM Raghu Angadi <ra...@google.com> wrote:

> The watermark is not directly available, you essentially infer from fired
> triggers (e.g. fixed windows). I would consider some of these options :
>   - Adhoc debugging : if the pipeline is close to realtime, you can just
> guess if a element will be dropped based on its timestamp and current time
> in the first stage (before first aggregation)
>   - Increase allowed lateness (say to 3 days) and drop the elements
> yourself you notice are later than 1 day.
>   - Place the elements into another window with larger allowed lateness
> and log very late elements in another parallel aggregation (see
> TriggerExample.java in Beam repo).
>
> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> Hi everyone!!
>>
>> I have a streaming job running with fixed windows of one hour and allowed
>> lateness of two days and the number of dropped due to lateness elements is
>> slowly, but continuously growing and I'd like to understand which elements
>> are those.
>>
>> I'd like to get the watermark from inside the job to compare it against
>> each element and write log messages with the ones that will be potentially
>> discarded.... Does that approach make any sense? I which case... How can I
>> get the watermark from inside the job? Any other ideas?
>>
>> Thanks in advance!!
>>
>
>

Re: Lateness droppings debugging

Posted by Raghu Angadi <ra...@google.com>.
The watermark is not directly available, you essentially infer from fired
triggers (e.g. fixed windows). I would consider some of these options :
  - Adhoc debugging : if the pipeline is close to realtime, you can just
guess if a element will be dropped based on its timestamp and current time
in the first stage (before first aggregation)
  - Increase allowed lateness (say to 3 days) and drop the elements
yourself you notice are later than 1 day.
  - Place the elements into another window with larger allowed lateness and
log very late elements in another parallel aggregation (see
TriggerExample.java in Beam repo).

On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <ca...@mrcalonso.com> wrote:

> Hi everyone!!
>
> I have a streaming job running with fixed windows of one hour and allowed
> lateness of two days and the number of dropped due to lateness elements is
> slowly, but continuously growing and I'd like to understand which elements
> are those.
>
> I'd like to get the watermark from inside the job to compare it against
> each element and write log messages with the ones that will be potentially
> discarded.... Does that approach make any sense? I which case... How can I
> get the watermark from inside the job? Any other ideas?
>
> Thanks in advance!!
>