You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gregory Fee <gf...@lyft.com> on 2018/03/22 23:02:21 UTC

Rowtime

Hello! I have found that even though I am processing using event time and I
provide an event time for all my events that the events produced in a
RetractStream I create from a Table do not have timestamps. That is to say
that I put a ProcessFunction on the RetractStream and ctx.timestamp()
always returns null. I went a step further and defined the table to include
a rowtime column, but that doesn't seem to make any difference. My SQL on
the Table is roughly just:

select user, count(item) from table group by user

Am I missing something? Is there any way to get a reasonable event
timestamp on the events in the retract stream?

-- 
*Gregory Fee*
Engineer
425.830.4734 <+14258304734>
[image: Lyft] <http://www.lyft.com>

Re: Rowtime

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Gregory,

Event-time timestamps are handled a bit differently in Flink's SQL compared
to the DataStream API.
In the DataStream API, timestamps are hidden from the user and implicitly
used for time-based operations such as windows.
In SQL, the query semantics cannot depend on hidden fields. Therefore, all
time fields (event-time but also processing time) must be part of the
schema of a table.
Internally, the SQL operators remove the internal DataStream timestamp and
move it into the record.

When a Table is converted into a DataStream, one of three things may happen:
1) the Table has no event-time field: the resulting DataStream will not
have a record timestamp. This is your case.
2) the Table has exactly one event-time field: the record timestamp of the
result DataStream will be set from the event-time field
3) the Table has more than one event-time field: the conversion fails and
you need to cast all but one event-time fields into regular TIMESTAMP
fields.

So usually, you would need to define a table with an event-time attribute
[1] and just forward it in the SELECT clause.

However, your case is a bit more complicated.
Your query is performing a non-windowed aggregation, which is not a
time-based operation but causes updates/retractions.
Therefore, it is a bit more tricky to preserve the event-time timestamp
because we must ensure that they are still aligned with the watermarks.
There would be two queries that would ensure watermark alignment:
- SELECT user, COUNT(*), MAX(rowtime) FROM t GROUP BY user;
- SELECT user, COUNT(*), LAST_VAL(rowtime) FROM t GROUP BY user;

These queries would forward the maximum (or last) event-time timestamp
(rowtime) to the result table.
However, none of these work in the current version or upcoming version of
Flink.
We also need to think about how timestamps would interact with retractions,
because retractions should not be treated as late records.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#event-time


2018-03-23 0:02 GMT+01:00 Gregory Fee <gf...@lyft.com>:

> Hello! I have found that even though I am processing using event time and
> I provide an event time for all my events that the events produced in a
> RetractStream I create from a Table do not have timestamps. That is to say
> that I put a ProcessFunction on the RetractStream and ctx.timestamp()
> always returns null. I went a step further and defined the table to include
> a rowtime column, but that doesn't seem to make any difference. My SQL on
> the Table is roughly just:
>
> select user, count(item) from table group by user
>
> Am I missing something? Is there any way to get a reasonable event
> timestamp on the events in the retract stream?
>
> --
> *Gregory Fee*
> Engineer
> 425.830.4734 <+14258304734>
> [image: Lyft] <http://www.lyft.com>
>