You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Hakob Manukyan <hm...@questrade.com> on 2020/09/11 17:37:07 UTC

Usage of DoFn#getAllowedTimestampSkew - depreciated reference to ticket BEAM-644

Dear BEAM Team,

I am trying to build and run a pipeline on Google Cloud Dataflow.

I have a situation where I need to use an event timestamp after parsing the
received custom message in DoFn. I called context.outputWithTimestamp but
it complained with exception about skew:

 ERROR org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler
- 2020-09-11T16:42:08.327Z: java.lang.IllegalArgumentException: Cannot
output with timestamp 2020-09-11T14:03:49.250Z. Output timestamps must be
no earlier than the timestamp of the current input
(2020-09-11T14:42:08.023Z) minus the allowed skew (0 milliseconds). See the
DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed
skew.

I override  getAllowedTimestampSkew() by returning
Duration.millis(Long.MAX_VALUE); and it seems it works.

documentation and source files mentioned this method as deprecated and
refer to BEAM-644 (https://issues.apache.org/jira/browse/BEAM-644) ticket
as a new solution but that ticket is still open.

Is it safe to use/override   getAllowedTimestampSkew()  ?

I am using FixedWindow for 1 minute with allowed lateness 5mins.

You can find more details of Window below:

apply(Window.<KV<String,
int>>into(FixedWindows.of(Duration.standardMinutes(1)))
                        .triggering(AfterWatermark.pastEndOfWindow()

.withLateFirings(AfterPane.elementCountAtLeast(1)))
                        .withAllowedLateness(Duration.standardMinutes(5))
                        .accumulatingFiredPanes())


Please, advise how to deal with getAllowedTimestampSkew and other solutions
that I can consider.

Thanks in advance,
I appreciate your help.


-- 
Hakob Manukyan
C++ Technical Lead
*www.questrade.com* <http://www.questrade.com/>
Phone  416.227.9876 x5520
Toll free  1.888.783.7866
Fax  1.888.767.1731
Questrade, Inc.
5650 Yonge St., Suite 1700, Toronto, ON, M2M 4G3
Questrade, Inc. is a registered investment dealer and member of the
Investment Industry Regulatory Organization of Canada and the Canadian
Investor Protection Fund.
Questrade® is a registered trademark licensed to Questrade Financial Group
Inc.

Re: Usage of DoFn#getAllowedTimestampSkew - depreciated reference to ticket BEAM-644

Posted by Luke Cwik <lc...@google.com>.
Your data can become droppably late if you output too far into the past
where too far is dependent on the current watermark and allowed lateness
and whether your current record is already late or not.

So "safe" depends on whether your business logic allows for the data to be
dropped or not. Some users want to attempt to ingest data as long as
the watermark is still behind but are ok with that data being dropped if it
is really old.

Generally it is discouraged because it leads to questions as to where did
my data go and we had envisioned a different transform to shift timestamps
of records but this was never completed[1].

1: https://issues.apache.org/jira/browse/BEAM-644

On Fri, Sep 11, 2020 at 10:45 AM Hakob Manukyan <hm...@questrade.com>
wrote:

> Dear BEAM Team,
>
> I am trying to build and run a pipeline on Google Cloud Dataflow.
>
> I have a situation where I need to use an event timestamp after parsing
> the received custom message in DoFn. I called context.outputWithTimestamp
> but it complained with exception about skew:
>
>  ERROR org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler
> - 2020-09-11T16:42:08.327Z: java.lang.IllegalArgumentException: Cannot
> output with timestamp 2020-09-11T14:03:49.250Z. Output timestamps must be
> no earlier than the timestamp of the current input
> (2020-09-11T14:42:08.023Z) minus the allowed skew (0 milliseconds). See the
> DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed
> skew.
>
> I override  getAllowedTimestampSkew() by returning
> Duration.millis(Long.MAX_VALUE); and it seems it works.
>
> documentation and source files mentioned this method as deprecated and
> refer to BEAM-644 (https://issues.apache.org/jira/browse/BEAM-644) ticket
> as a new solution but that ticket is still open.
>
> Is it safe to use/override   getAllowedTimestampSkew()  ?
>
> I am using FixedWindow for 1 minute with allowed lateness 5mins.
>
> You can find more details of Window below:
>
> apply(Window.<KV<String,
> int>>into(FixedWindows.of(Duration.standardMinutes(1)))
>                         .triggering(AfterWatermark.pastEndOfWindow()
>
> .withLateFirings(AfterPane.elementCountAtLeast(1)))
>                         .withAllowedLateness(Duration.standardMinutes(5))
>                         .accumulatingFiredPanes())
>
>
> Please, advise how to deal with getAllowedTimestampSkew and other
> solutions that I can consider.
>
> Thanks in advance,
> I appreciate your help.
>
>
> --
> Hakob Manukyan
> C++ Technical Lead
> *www.questrade.com* <http://www.questrade.com/>
> Phone  416.227.9876 x5520 <(416)%20227-9876>
> Toll free  1.888.783.7866 <(888)%20783-7866>
> Fax  1.888.767.1731 <(888)%20767-1731>
> Questrade, Inc.
> 5650 Yonge St., Suite 1700, Toronto, ON, M2M 4G3
> Questrade, Inc. is a registered investment dealer and member of the
> Investment Industry Regulatory Organization of Canada and the Canadian
> Investor Protection Fund.
> Questrade® is a registered trademark licensed to Questrade Financial Group
> Inc.
>