You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Yichi Zhang (Jira)" <ji...@apache.org> on 2021/01/30 05:23:00 UTC

[jira] [Updated] (BEAM-10329) Dataflow runner does not reserve timestamp of Create.Timestamped() in batch stateful dofn

     [ https://issues.apache.org/jira/browse/BEAM-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yichi Zhang updated BEAM-10329:
-------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Resolved)

> Dataflow runner does not reserve timestamp of Create.Timestamped() in batch stateful dofn
> -----------------------------------------------------------------------------------------
>
>                 Key: BEAM-10329
>                 URL: https://issues.apache.org/jira/browse/BEAM-10329
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Yichi Zhang
>            Assignee: Yichi Zhang
>            Priority: P2
>              Labels: stale-assigned
>             Fix For: 2.23.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> When run a test pipeline such as 
> {code:java}
> public void testTimestampedValue() throws Exception {
>       final String timerId = "foo";
>       DoFn<KV<String, Long>, KV<Long, Instant>> statefn =
>           new DoFn<KV<String, Long>, KV<Long, Instant>>() {
>             @TimerId(timerId)
>             private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>             @ProcessElement
>             public void processElement(
>                 @TimerId(timerId) Timer timer,
>                 @Timestamp Instant timestamp,
>                 OutputReceiver<KV<Long, Instant>> r) {
>               r.output(KV.of(3L, timestamp));
>             }
>             @OnTimer(timerId)
>             public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) {
>               // do nothing. Since whether timer is involved doesn’t make difference
>             }
>           };
>       PCollection<KV<Long, Instant>> output =
>           pipeline
>               .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 37L), new Instant(123L))))
>               .apply(ParDo.of(statefn));
>       PAssert.that(output).containsInAnyOrder(KV.of(3L, new Instant(123L)));
>       pipeline.run();
> }
> {code}
> On dataflow with fnapi, the timestamp of output in PAssert won't match.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)