You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Yichi Zhang (Jira)" <ji...@apache.org> on 2021/01/30 05:23:00 UTC
[jira] [Updated] (BEAM-10329) Dataflow runner does not reserve
timestamp of Create.Timestamped() in batch stateful dofn
[ https://issues.apache.org/jira/browse/BEAM-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yichi Zhang updated BEAM-10329:
-------------------------------
Resolution: Fixed
Status: Resolved (was: Resolved)
> Dataflow runner does not reserve timestamp of Create.Timestamped() in batch stateful dofn
> -----------------------------------------------------------------------------------------
>
> Key: BEAM-10329
> URL: https://issues.apache.org/jira/browse/BEAM-10329
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Yichi Zhang
> Assignee: Yichi Zhang
> Priority: P2
> Labels: stale-assigned
> Fix For: 2.23.0
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> When run a test pipeline such as
> {code:java}
> public void testTimestampedValue() throws Exception {
> final String timerId = "foo";
> DoFn<KV<String, Long>, KV<Long, Instant>> statefn =
> new DoFn<KV<String, Long>, KV<Long, Instant>>() {
> @TimerId(timerId)
> private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @ProcessElement
> public void processElement(
> @TimerId(timerId) Timer timer,
> @Timestamp Instant timestamp,
> OutputReceiver<KV<Long, Instant>> r) {
> r.output(KV.of(3L, timestamp));
> }
> @OnTimer(timerId)
> public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) {
> // do nothing. Since whether timer is involved doesn’t make difference
> }
> };
> PCollection<KV<Long, Instant>> output =
> pipeline
> .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 37L), new Instant(123L))))
> .apply(ParDo.of(statefn));
> PAssert.that(output).containsInAnyOrder(KV.of(3L, new Instant(123L)));
> pipeline.run();
> }
> {code}
> On dataflow with fnapi, the timestamp of output in PAssert won't match.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)