You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Konstantin Knauf <kn...@apache.org> on 2020/07/02 16:49:35 UTC

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

Hi everyone,

well, this got complicated :) Let me add my thoughts:

* Temporal Table Joins are already quite hard to understand for many users.
If need be, we should trade off for simplicity.

* The important case is the *event time *temporal join. In my understanding
processing time temporal joins are comparably easy, no history tracking is
needed, etc.

* It seems that for regular upsert streams with an event time attribute,
everyone agrees that it works. There are olny questions about multiple
event time attributes, which we could in my opinion postpone for future
work.

* For changelog streams, which specify an event time column explicitly, it
should be possible to use it for event time temporal tables. I understand
that deletion can not be handled properly, but we could - for example -
handle this exactly like an upsert stream, i.e. ignore deletions. This is a
limitation, but it is at least easy to understand and acceptable for many
use cases, I believe. Alternatively, one could also use the "ts_ms" for
deletion, which would always be larger than the event time.

CREATE TABLE currency_rates (
  id BIGINT,
  name STRING,
  rate DECIMAL(10, 5),  time TIMESTAMP(3),   WATERMARK FOR time AS ...) WITH (
 'connector' = 'kafka',
 ...
 'format' = 'debezium-json')


* For changelog streams without an event time attribute (the more common
case?), it would be great if we can support temporal table joins based on
"ts_ms" (in the debezium case). One option could be to "simply" extract
"ts_ms" and make it possible to use it as an event time column. Then we
would again be in the above case. Thinking about it, this could even be
addressed in [1], which is also planned for Flink 1.12 as far as I know. *
This could look something like:

CREATE TABLE topic_products (
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2),   time TIMESTAMP(3)) WITH (
 'connector' = 'kafka',
 ...
 'format' = 'debezium-json'
 'timestamp' = 'time' )


I hope I roughly understood your concerns and made sense in my comments.
Looking forward to what you think.

Cheers,

Konstantin


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
*  In this case, the DELETE statement could theoretically actually be
handled properly, because the "ts_ms" is used throughout.

On Sun, Jun 28, 2020 at 8:05 AM Jingsong Li <ji...@gmail.com> wrote:

> Thanks for your discussion.
>
> Looks like the problem is supporting the versioned temporal table for the
> changelog source.
>
> I want to share more of my thoughts:
>
> When I think about changelog sources, I treat it as a view like: "CREATE
> VIEW changelog_table AS SELECT ... FROM origin_table GROUP BY ..." (Some
> queries produce changelog records).
>
> Does this view support window aggregation? No...
> Does this view support versioned temporal tables? No...
>
> Because both window aggregation and versioned temporal tables require a
> time attribute.
>
> So can we give this view a new time attribute?
> 1. No, keep it not supported.
> 2. Using processing time.
> 3. there is an operation time in this view, something like processing time
> when modifying the origin table. Treat this operation time as rowtime.
> 4. Introduce a new time attribute concept: operation time. Assuming it
> monotonically increases, no watermark.
>
> NOTE: For the versioned temporal table, there is a time-relation between
> these two tables. This time attribute must be something user perceived.
>
> I am slightly +1 for #1 and #2.
> For #1: If users really want to support the versioned temporal table for
> the changelog source. They can change the definition. And make the
> changelog source as a regular table, then they have an operation time field
> in the table schema, they can use this field as a rowtime field.
> For #2: This versioned temporal table is joined using the processing-time
> way, it means we assume records come in a monotonically way, I think it is
> good to match changelog concept.
>
> -1 for #3 and #4.
> It can work, but l think it is hard to understand what is the rowtime
> attribute after "changing" the table.
> And I don't think it is not worth creating another concept for users.
>
> Best,
> Jingsong Lee
>
> On Thu, Jun 25, 2020 at 10:30 PM Jark Wu <im...@gmail.com> wrote:
>
> > Hi all,
> >
> > Thanks Leonard for summarizing our discussion. I want to share more of my
> > thoughts:
> >
> > * rowtime is a column in the its schema, so the rowtime of DELETE event
> is
> > the value of the previous image.
> > * operation time is the time when the DML statements happen in databases,
> > so the operation time of DELETE events is the time when it happens.
> > * rowtime can't be used as operation time for history tracking
> > * operation time can't be used as rowtime (can't apply window on the
> > operation time)
> > * rowtime and operation time are orthogonal concepts and used in
> different
> > scenarios.
> > * operation time implicitly means it is monotonically increasing, we
> don't
> > need watermark syntax to specify the out of boundness for it.
> >
> > ======================================================================
> > So, conclusion from my side so far:
> >
> > * watermark/rowtime + primary key + changelog source != versioned
> temporal
> > table
> > * operation time + primary key + changelog source == versioned temporal
> > table
> > * We may need something like 'PERIOD FOR SYSTEM_TIME(op_ts)' to define
> the
> > operation time
> >
> > ======================================================================
> > However, there is still a pending question I don't have answer:
> >
> > Assuming you are doing a MIN aggregate on the operation time, that
> doesn't
> > work because the DELETE/UPDATE_BEFORE doesn't hold
> > the previous value of operation time and thus can't retract.
> >
> > The operation time in fact should be metadata information (just like
> > RowKind) which shouldn't be in the shema, and can't be accessed in
> queries.
> > But the PERIOD FOR SYSTEM_TIME syntax is in the schema part and should
> > refer to a field in the schema...
> >
> > ======================================================================
> >
> > Anyway, let's focus on the operation_time vs rowtime problem first. Let
> me
> > know what's your thought!
> >
> > Best,
> > Jark
> >
> > On Wed, 24 Jun 2020 at 23:49, Leonard Xu <xb...@gmail.com> wrote:
> >
> > > Hi, kurt, Fabian
> > >
> > > After an offline discussion with Jark, We think that the  'PERIOD FOR
> > > SYSTEM_TIME(operation_time)' statement might be needed now. Changelog
> > table
> > > is superset of insert-only table, use PRIMARY KEY and rowtime may work
> > well
> > > in insert-only or upsert source but has some problem in changelog
> table.
> > >
> > > 'PERIOD FOR SYSTEM_TIME(operation_time)' in a temporal table
> > > defines/maintains  the valid time of each row, the rowtime can not play
> > the
> > > history tracking function well.
> > >
> > > *# 1.*operation time (version time) *vs* rowtime (watermark)
> > >
> > > I will take an example to explain. The following changelog records came
> > > from database table using debezium tool:
> > > { "before":  null
> > >   "after":    {"currency": "Euro", "rate": 118, "gmt_modified":
> > > "12:00:01"},
> > >   "op":       "c",  //INSERT
> > >   "ts_ms": 1592971201000 // 2020-06-24 12:00:02
> > > }
> > > { "before": {"currency": "Euro", "rate": 114, "gmt_modified":
> > "12:00:05"},
> > >   "after":    {"currency": "Euro", "rate": 118, "gmt_modified":
> > > "12:00:05"},
> > >   "op":       "u",  //UPDATE
> > >   "ts_ms": 1592971206000 // 2020-06-24 12:00:06
> > > }
> > >
> > > { "before": {"currency": "Euro", "rate": 118, "gmt_modified":
> > "12:00:05"},
> > >   "after":     null,
> > >   "op":        "d",  //DELETE
> > >   "ts_ms":  1593000011000  // 2020-06-24 20:00:11
> > > }
> > >
> > > The rowtime should be the "gmt_modified" field that belongs to the
> > > original record,the "ts_ms" is the the operation time when the DML
> > > statement happen in the DB. For DELETE changelog record, its
> > "gmt_modified"
> > > field (12:00:05) can not reflect the real operation time (20:00:11).
> > >
> > > In temporal join case, we should maintain the valid time of each row.
> For
> > > a DELETE event, we should use the operation time of DELETE as the “end
> > > time” of the row. That says, the record {"currency": "Euro", "rate":
> 118}
> > > is not exist anymore after “20:00:11”, not “12:00:05”.
> > >
> > > we would not access the record {"currency": "Euro", "rate": 118,
> > > "gmt_modified": "12:00:05"} when rowtime is bigger than (12:00:05) if
> we
> > > use rowtime to track the history version, because the DELETE changelog
> > > record also has rowtime (12:00:05) and will clear the record in state.
> In
> > > fact, the expected result is that the record expires until (20:00:11)
> > when
> > > the record is deleted rather than the last update time(20:00:11) in
> > > materialized state.
> > >
> > > From this case, I found rowtime and operation time should be orthogonal
> > in
> > > temporal table scenario. The operation time should be strictly
> > > monotonically increasing  (no out of order) and only be used for
> > tracking a
> > > history version of a changelog table, every history version of
> changelog
> > > table equals a database table snapshot due to the stream-table duality.
> > >
> > > *# 2.*The semantic of rowtime and watermark on changelog
> > >
> > > The rowtime and watermark can also be defined on a changelog table just
> > > like other source backed queue, Flink supports cascaded window
> > aggregation
> > > (with ) in SQL like:
> > > SELECT
> > >      TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND),
> > >      MAX(rate) AS rate
> > > FROM (
> > >        SELECT
> > >           MAX(rate) AS rate,
> > >           TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) AS `rowtime`
> > >        FROM currency
> > >             GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND)
> > >     )
> > >     GROUP BY TUMBLE(rowtime, INTERVAL '60' SECOND
> > >
> > > We can think of the output of the first window aggregation as a
> changelog
> > > source of the second window aggregation. There are INSERT/UPDATE/DELETE
> > > messages and also watermarks in the changelog stream. And the rowtime
> in
> > > the changelog stream is the `TUMBLE_ROWTIME` value (just like the
> > > `gmt_modified` column in DB).
> > >
> > > *#  summary*
> > >
> > >    1. we should use ‘PERIOD FOR SYSTEM_TIME(operation_time) syntax to
> > >    track history version by operation time rather than rowtime in
> > temporal
> > >    table scenario.
> > >    2. we also support define a rowtime(watermark) on changelog table,
> but
> > >    the rowtime will not be used to track the history of changelog
> stream.
> > >
> > >
> > >
> > > WDYT? please correct me if I am wrong.
> > >
> > >
> > > Best,
> > >
> > > Leonard
> > >
> > >
> > >
> > >
> > > 在 2020年6月24日,11:31,Leonard Xu <xb...@gmail.com> 写道:
> > >
> > > Hi, everyone
> > >
> > > Thanks Fabian,Kurt for making the multiple version(event time) clear, I
> > > also like the 'PERIOD FOR SYSTEM' syntax which supported in SQL
> > standard. I
> > > think we can add some explanation of the multiple version support in
> the
> > > future section of FLIP.
> > >
> > > For the PRIMARY KEY semantic, I agree with Jark's point that the
> semantic
> > > should unify both on changelog source and insert-only source.
> > >
> > > Currently, Flink supports PRIMARY KEY after FLIP-87, Flink uses PRIMARY
> > > KEY NOT ENFORCED because Flink does not own the data like other DBMS
> > therefore
> > > Flink won't validate/enforce the key integrity and only trusts the
> > external
> > > systems. It is expected user and external system/application should
> make
> > > sure no deduplicated records happened when using NOT ENFORCED.
> > >
> > > (a) For PRIMARY KEY NOT ENFORCED semantic on changelog source:
> > > It means the materialized changelogs (INSERT/UPDATE/DELETE) should be
> > > unique on the primary key constraints.Flink assumes messages are in
> order
> > > on the primary key. Flink will use the PRIMARY KEY for some
> optimization,
> > > e.g. use the PRIMARY KEY to update the materialized state by key in
> > > temporal join operator.
> > >
> > >
> > > (b) For PRIMARY KEY NOT ENFORCED semantic on insert-only source:
> > > It means records should be unique on the primary key constraints. If
> > there
> > > are INSERT records with duplicate primary key columns, the result of
> SQL
> > > query might be nondeterministic because it broken the PRIMARY KEY
> > > constraints.
> > >
> > > Cheers,
> > > Leonard
> > >
> > >
> > > 在 2020年6月23日,23:35,Fabian Hueske <fh...@gmail.com> 写道:
> > >
> > > Thanks Kurt,
> > >
> > > Yes, you are right.
> > > The `PERIOD FOR SYSTEM_TIME` that you linked before corresponds to the
> > > VERSION clause that I used and would explicitly define the versioning
> of
> > a
> > > table.
> > > I didn't know that the `PERIOD FOR SYSTEM_TIME` cause is already
> defined
> > by
> > > the SQL standard.
> > > I think we would need a slightly different syntax though because (so
> far)
> > > the validity of a row is determined by its own timestamp and the
> > timestamp
> > > of the next row.
> > >
> > > Adding a clause later solves the ambiguity issue for tables with
> multiple
> > > event-time attributes.
> > > However, I'd feel more comfortable having such a cause and an explicit
> > > definition of the temporal property from the beginning.
> > > I guess this is a matter of personal preference so I'll go with the
> > > majority if we decide that every table that has a primary key and an
> > > event-time attribute should be usable in an event-time temporal table
> > join.
> > >
> > > Thanks, Fabian
> > >
> > >
> > > Am Di., 23. Juni 2020 um 16:58 Uhr schrieb Kurt Young <
> ykt836@gmail.com
> > >:
> > >
> > > Hi Fabian,
> > >
> > > I agree with you that implicitly letting event time to be the version
> of
> > > the table will
> > > work in most cases, but not for all. That's the reason I mentioned
> > `PERIOD
> > > FOR` [1]
> > > syntax in my first email, which is already in sql standard to represent
> > the
> > > validity of
> > > each row in the table.
> > >
> > > If the event time can't be used, or multiple event time are defined, we
> > > could still add
> > > this syntax in the future.
> > >
> > > What do you think?
> > >
> > > [1]
> > >
> > >
> > >
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15
> > > Best,
> > > Kurt
> > >
> > >
> > > On Tue, Jun 23, 2020 at 9:12 PM Fabian Hueske <fh...@gmail.com>
> wrote:
> > >
> > > Hi everyone,
> > >
> > > Every table with a primary key and an event-time attribute provides
> what
> > >
> > > is
> > >
> > > needed for an event-time temporal table join.
> > > I agree that, from a technical point of view, the TEMPORAL keyword is
> not
> > > required.
> > >
> > > I'm more sceptical about implicitly deriving the versioning information
> > >
> > > of
> > >
> > > a (temporal) table as the table's only event-time attribute.
> > > In the query
> > >
> > > SELECT *
> > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
> > > WHERE o.currency = r.currency
> > >
> > > the syntax of the temporal table join does not explicitly reference the
> > > version of the temporal rates table.
> > > Hence, the system needs a way to derive the version of temporal table.
> > >
> > > Implicitly using the (only) event-time attribute of a temporal table
> > >
> > > (rates
> > >
> > > in the example above) to identify the right version works in most
> cases,
> > > but probably not in all.
> > > * What if a table has more than one event-time attribute? (TableSchema
> is
> > > designed to support multiple watermarks; queries with interval joins
> > > produce tables with multiple event-time attributes, ...)
> > > * What if the table does not have an event-time attribute in its schema
> > >
> > > but
> > >
> > > the version should only be provided as meta data?
> > >
> > > We could add a clause to define the version of a table, such as:
> > >
> > > CREATE TABLE rates (
> > >   currency CHAR(3) NOT NULL PRIMARY KEY,
> > >   rate DOUBLE,
> > >   rowtime TIMESTAMP,
> > >   WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE),
> > > VERSION (rowtime)
> > > WITH (...);
> > >
> > > The presence of a the VERSION clause (or whatever syntax) would
> > >
> > > explicitly
> > >
> > > define the version of a (temporal) table.
> > > It would also render the need for the TEMPORAL keyword superfluous
> > >
> > > because
> > >
> > > there would be another indicator that a table can be used in a temporal
> > > table join.
> > >
> > > I'm OK with not adding the TEMPORAL keyword, but I recommend that we
> > >
> > > think
> > >
> > > again about the proposed implicit definition of a table's version and
> how
> > > it might limit use in the future.
> > >
> > > Cheers,
> > > Fabian
> > >
> > > Am Mo., 22. Juni 2020 um 16:14 Uhr schrieb Jark Wu <im...@gmail.com>:
> > >
> > > I'm also +1 for not adding the TEMPORAL keyword.
> > >
> > > +1 to make the PRIMARY KEY semantic clear for sources.
> > > From my point of view:
> > >
> > > 1) PRIMARY KEY on changelog souruce:
> > > It means that when the changelogs (INSERT/UPDATE/DELETE) are
> > >
> > > materialized,
> > >
> > > the materialized table should be unique on the primary key columns.
> > > Flink assumes messages are in order on the primary key. Flink doesn't
> > > validate/enforces the key integrity, but simply trust it (thus NOT
> > > ENFORCED).
> > > Flink will use the PRIMARY KEY for some optimization, e.g. use the
> > >
> > > PRIMARY
> > >
> > > KEY to update the materilized state by key in temporal join operator.
> > >
> > > 2) PRIMARY KEY on insert-only source:
> > > I prefer to have the same semantic to the batch source and changelog
> > > source, that it implies that records are not duplicate on the primary
> > >
> > > key.
> > >
> > > Flink just simply trust the primary key constraint, and doesn't valid
> > >
> > > it.
> > >
> > > If there is duplicate primary keys with INSERT changeflag, then result
> > >
> > > of
> > >
> > > Flink query might be wrong.
> > >
> > > If this is a TEMPORAL TABLE FUNCTION scenario, that source emits
> > >
> > > duplicate
> > >
> > > primary keys with INSERT changeflag, when we migrate this case to
> > >
> > > temporal
> > >
> > > table DDL,
> > > I think this source should emit INSERT/UPDATE (UPSERT) messages instead
> > >
> > > of
> > >
> > > INSERT-only messages,  e.g. a Kafka compacted topic source?
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Mon, 22 Jun 2020 at 17:04, Konstantin Knauf <kn...@apache.org>
> > >
> > > wrote:
> > >
> > >
> > > Hi everyone,
> > >
> > > I also agree with Leonard/Kurt's proposal for CREATE TEMPORAL TABLE.
> > >
> > > Best,
> > >
> > > Konstantin
> > >
> > > On Mon, Jun 22, 2020 at 10:53 AM Kurt Young <yk...@gmail.com>
> > >
> > > wrote:
> > >
> > >
> > > I agree with Timo, semantic about primary key needs more thought
> > >
> > > and
> > >
> > > discussion, especially after FLIP-95 and FLIP-105.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Mon, Jun 22, 2020 at 4:45 PM Timo Walther <tw...@apache.org>
> > >
> > > wrote:
> > >
> > >
> > > Hi Leonard,
> > >
> > > thanks for the summary.
> > >
> > > After reading all of the previous arguments and working on
> > >
> > > FLIP-95. I
> > >
> > > would also lean towards the conclusion of not adding the TEMPORAL
> > >
> > > keyword.
> > >
> > >
> > > After FLIP-95, what we considered as a CREATE TEMPORAL TABLE can
> > >
> > > be
> > >
> > > represented as a CREATE TABLE with PRIMARY KEY and WATERMARK. The
> > >
> > > FOR
> > >
> > > SYSTEM_TIME AS OF t would trigger the internal materialization
> > >
> > > and
> > >
> > > "temporal" logic.
> > >
> > > However, we should discuss the meaning of PRIMARY KEY again in
> > >
> > > this
> > >
> > > case. In a TEMPORAL TABLE scenario, the source would emit
> > >
> > > duplicate
> > >
> > > primary keys with INSERT changeflag but at different point in
> > >
> > > time.
> > >
> > > Currently, we require a PRIMARY KEY NOT ENFORCED declaration. The
> > > changelog semantics of FLIP-95 and FLIP-105 don't work well with
> > >
> > > a
> > >
> > > primary key declaration.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 20.06.20 17:08, Leonard Xu wrote:
> > >
> > > Hi everyone,
> > >
> > > Thanks for the nice discussion. I’d like to move forward the
> > >
> > > work,
> > >
> > > please let me simply summarize the main opinion and current
> > >
> > > divergences.
> > >
> > >
> > > 1. The agreements have been achieved:
> > >
> > > 1.1 The motivation we're discussing temporal table DDL is just
> > >
> > > for
> > >
> > > creating temporal table in pure SQL to replace pre-process
> > >
> > > temporal
> > >
> > > table
> > >
> > > in YAML/Table API for usability.
> > >
> > > 1.2 The reason we use "TEMPORAL" keyword rather than “PERIOD
> > >
> > > FOR
> > >
> > > SYSTEM_TIME” is to make user understand easily.
> > >
> > > 1.3 For append-only table, it can convert to changelog table
> > >
> > > which
> > >
> > > has
> > >
> > > been discussed in FLIP-105, we assume the following temporal
> > >
> > > table
> > >
> > > is
> > >
> > > comes
> > >
> > > from changelog (Jark, fabian, Timo).
> > >
> > > 1.4 For temporal join syntax, using "FOR SYSTEM_TIME AS OF x"
> > >
> > > instead
> > >
> > > of
> > >
> > > the current `LATERAL TABLE(rates(x))`  has come to an
> > >
> > > agreement(Fabian,
> > >
> > > Timo, Seth, Konstantin, Kurt).
> > >
> > >
> > > 2. The small divergence :
> > >
> > > About the definition syntax of the temporal table,
> > >
> > > CREATE [TEMPORAL] TABLE rates (
> > >    currency CHAR(3) NOT NULL PRIMARY KEY,
> > >    rate DOUBLE,
> > >    rowtime TIMESTAMP,
> > >    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> > > WITH (...);
> > >
> > > there is small divergence whether add "TEMPORAL" keyword or
> > >
> > > not.
> > >
> > >
> > > 2.1  one opinion is using "CREATE TEMPORAL TABLE" (Timo,
> > >
> > > Fabian,
> > >
> > > Seth),
> > >
> > > the main advantages are:
> > >
> > > (1)"TEMPORAL" keyword is intuitive to indicate the history
> > >
> > > tracking
> > >
> > > semantics.
> > >
> > > (2)"TEMPORAL" keyword illustrates that queries can visit the
> > >
> > > previous
> > >
> > > versions of a table like other DBMS use "PERIOD FOR SYSTEM_TIME"
> > >
> > > keyword.
> > >
> > >
> > > 2.2 the other is using "CREATE TABLE"(Kurt), the main
> > >
> > > advantages
> > >
> > > are:
> > >
> > > (1)Just primary key and time attribute can track previous
> > >
> > > versions
> > >
> > > of a
> > >
> > > table well.
> > >
> > > (2)The temporal behavior is triggered by temporal join syntax
> > >
> > > rather
> > >
> > > than in DDL, all Flink DDL table are dynamic table logically
> > >
> > > including
> > >
> > > temporal table. If we decide to use "TEMPORAL" keyword and treats
> > >
> > > changelog
> > >
> > > as temporal table, other tables backed queue like Kafka should
> > >
> > > also
> > >
> > > use
> > >
> > > "TEMPORAL" keyword.
> > >
> > >
> > >
> > > IMO, the statement “CREATE TEMPORARY TEMPORAL TABLE...” follows
> > >
> > > with
> > >
> > > 2.1
> > >
> > > may confuse users much. If we take a second to think about, for
> > >
> > > source/sink
> > >
> > > table which may backed queue (like kafka) or DB (like MySQL), we
> > >
> > > did
> > >
> > > not
> > >
> > > add any keyword in DDL to specify they are source or sinks, it
> > >
> > > works
> > >
> > > well.
> > >
> > > I think temporal table is the third one,  kafka data source and
> > >
> > > DB
> > >
> > > data
> > >
> > > source can play as a source/sink/temporal table depends on the
> > > position/syntax that user put them in the query. The above rates
> > >
> > > table
> > >
> > >     - can be a source table if user put it at `SELECT * FROM
> > >
> > > rates;`
> > >
> > >     - can be a temporal table if user put it at `SELECT * FROM
> > >
> > > orders
> > >
> > > JOIN rates FOR SYSTEM_TIME AS OF orders.proctime
> > >
> > >              ON orders.currency = rates.currency;`
> > >     - can be sink table if user put is at `INSERT INTO rates
> > >
> > > SELECT
> > >
> > > *
> > >
> > > FROM …; `
> > >
> > > From these cases, we found all tables defined in Flink should
> > >
> > > be
> > >
> > > dynamic table logically, the source/sink/temporal role depends on
> > >
> > > the
> > >
> > > position/syntax in user’s query.
> > >
> > >       In fact we have used similar syntax for current lookup
> > >
> > > table,
> > >
> > > we
> > >
> > > didn’t add “LOOKUP" or “TEMPORAL" keyword for lookup table and
> > >
> > > trigger
> > >
> > > the
> > >
> > > temporal join from the position/syntax(“FOR SYSTEM_TIME AS OF x")
> > >
> > > in
> > >
> > > query.
> > >
> > >
> > > So, I prefer to resolve the small divergence with “CREATE
> > >
> > > TABLE”
> > >
> > > which
> > >
> > > (1) is more unified with our source/sink/temporal dynamic table
> > >
> > > conceptually,
> > >
> > > (2) is aligned with current lookup table,
> > > (3) also make users learn less keyword.
> > >
> > > WDYT?
> > >
> > > Best,
> > > Leonard Xu
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Konstantin Knauf
> > >
> > > https://twitter.com/snntrable
> > >
> > > https://github.com/knaufk
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

Posted by Seth Wiesman <sj...@gmail.com>.
It is clear there are a lot of edge cases with temporal tables that need to
be carefully thought out. If we go at this problem from the perspective of
what a majority of users need to accomplish in production, I believe there
is a simpler version of this problem we can solve that can be expanded in
the future.

The most important practical use case is denormalizing star schemas. A user
has the main data stream, their fact table, that needs to be processed.
Before applying specific business logic the dimension stream needs to be
joined with one or more dimension streams. The canonical example of this
being joining transactions with currency rates. I'm not saying all this to
be pedantic but to make the point that if we can solve this practical use
case in a way that may be extended in the future I believe that will
already be immensely useful for most users.

In this case, the data is most likely coming from CDC or something that
approximates it and contains a clearly defined event time column.

For this common use case the syntax would only need to support:

   - Single event time column
   - Joining with externally defined Upsert streams. In the first version,
   a stream could only be used as a temporal table if the join was the first
   operation after reading from an external source. We could disallow using
   streams post flink aggregation as temporal tables in the beginning until
   there is a larger consensus of what timestamp to use.

Seth

On Thu, Jul 2, 2020 at 11:49 AM Konstantin Knauf <kn...@apache.org> wrote:

> Hi everyone,
>
> well, this got complicated :) Let me add my thoughts:
>
> * Temporal Table Joins are already quite hard to understand for many users.
> If need be, we should trade off for simplicity.
>
> * The important case is the *event time *temporal join. In my understanding
> processing time temporal joins are comparably easy, no history tracking is
> needed, etc.
>
> * It seems that for regular upsert streams with an event time attribute,
> everyone agrees that it works. There are olny questions about multiple
> event time attributes, which we could in my opinion postpone for future
> work.
>
> * For changelog streams, which specify an event time column explicitly, it
> should be possible to use it for event time temporal tables. I understand
> that deletion can not be handled properly, but we could - for example -
> handle this exactly like an upsert stream, i.e. ignore deletions. This is a
> limitation, but it is at least easy to understand and acceptable for many
> use cases, I believe. Alternatively, one could also use the "ts_ms" for
> deletion, which would always be larger than the event time.
>
> CREATE TABLE currency_rates (
>   id BIGINT,
>   name STRING,
>   rate DECIMAL(10, 5),  time TIMESTAMP(3),   WATERMARK FOR time AS ...)
> WITH (
>  'connector' = 'kafka',
>  ...
>  'format' = 'debezium-json')
>
>
> * For changelog streams without an event time attribute (the more common
> case?), it would be great if we can support temporal table joins based on
> "ts_ms" (in the debezium case). One option could be to "simply" extract
> "ts_ms" and make it possible to use it as an event time column. Then we
> would again be in the above case. Thinking about it, this could even be
> addressed in [1], which is also planned for Flink 1.12 as far as I know. *
> This could look something like:
>
> CREATE TABLE topic_products (
>   id BIGINT,
>   name STRING,
>   description STRING,
>   weight DECIMAL(10, 2),   time TIMESTAMP(3)) WITH (
>  'connector' = 'kafka',
>  ...
>  'format' = 'debezium-json'
>  'timestamp' = 'time' )
>
>
> I hope I roughly understood your concerns and made sense in my comments.
> Looking forward to what you think.
>
> Cheers,
>
> Konstantin
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> *  In this case, the DELETE statement could theoretically actually be
> handled properly, because the "ts_ms" is used throughout.
>
> On Sun, Jun 28, 2020 at 8:05 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
> > Thanks for your discussion.
> >
> > Looks like the problem is supporting the versioned temporal table for the
> > changelog source.
> >
> > I want to share more of my thoughts:
> >
> > When I think about changelog sources, I treat it as a view like: "CREATE
> > VIEW changelog_table AS SELECT ... FROM origin_table GROUP BY ..." (Some
> > queries produce changelog records).
> >
> > Does this view support window aggregation? No...
> > Does this view support versioned temporal tables? No...
> >
> > Because both window aggregation and versioned temporal tables require a
> > time attribute.
> >
> > So can we give this view a new time attribute?
> > 1. No, keep it not supported.
> > 2. Using processing time.
> > 3. there is an operation time in this view, something like processing
> time
> > when modifying the origin table. Treat this operation time as rowtime.
> > 4. Introduce a new time attribute concept: operation time. Assuming it
> > monotonically increases, no watermark.
> >
> > NOTE: For the versioned temporal table, there is a time-relation between
> > these two tables. This time attribute must be something user perceived.
> >
> > I am slightly +1 for #1 and #2.
> > For #1: If users really want to support the versioned temporal table for
> > the changelog source. They can change the definition. And make the
> > changelog source as a regular table, then they have an operation time
> field
> > in the table schema, they can use this field as a rowtime field.
> > For #2: This versioned temporal table is joined using the processing-time
> > way, it means we assume records come in a monotonically way, I think it
> is
> > good to match changelog concept.
> >
> > -1 for #3 and #4.
> > It can work, but l think it is hard to understand what is the rowtime
> > attribute after "changing" the table.
> > And I don't think it is not worth creating another concept for users.
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, Jun 25, 2020 at 10:30 PM Jark Wu <im...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > Thanks Leonard for summarizing our discussion. I want to share more of
> my
> > > thoughts:
> > >
> > > * rowtime is a column in the its schema, so the rowtime of DELETE event
> > is
> > > the value of the previous image.
> > > * operation time is the time when the DML statements happen in
> databases,
> > > so the operation time of DELETE events is the time when it happens.
> > > * rowtime can't be used as operation time for history tracking
> > > * operation time can't be used as rowtime (can't apply window on the
> > > operation time)
> > > * rowtime and operation time are orthogonal concepts and used in
> > different
> > > scenarios.
> > > * operation time implicitly means it is monotonically increasing, we
> > don't
> > > need watermark syntax to specify the out of boundness for it.
> > >
> > > ======================================================================
> > > So, conclusion from my side so far:
> > >
> > > * watermark/rowtime + primary key + changelog source != versioned
> > temporal
> > > table
> > > * operation time + primary key + changelog source == versioned temporal
> > > table
> > > * We may need something like 'PERIOD FOR SYSTEM_TIME(op_ts)' to define
> > the
> > > operation time
> > >
> > > ======================================================================
> > > However, there is still a pending question I don't have answer:
> > >
> > > Assuming you are doing a MIN aggregate on the operation time, that
> > doesn't
> > > work because the DELETE/UPDATE_BEFORE doesn't hold
> > > the previous value of operation time and thus can't retract.
> > >
> > > The operation time in fact should be metadata information (just like
> > > RowKind) which shouldn't be in the shema, and can't be accessed in
> > queries.
> > > But the PERIOD FOR SYSTEM_TIME syntax is in the schema part and should
> > > refer to a field in the schema...
> > >
> > > ======================================================================
> > >
> > > Anyway, let's focus on the operation_time vs rowtime problem first. Let
> > me
> > > know what's your thought!
> > >
> > > Best,
> > > Jark
> > >
> > > On Wed, 24 Jun 2020 at 23:49, Leonard Xu <xb...@gmail.com> wrote:
> > >
> > > > Hi, kurt, Fabian
> > > >
> > > > After an offline discussion with Jark, We think that the  'PERIOD FOR
> > > > SYSTEM_TIME(operation_time)' statement might be needed now. Changelog
> > > table
> > > > is superset of insert-only table, use PRIMARY KEY and rowtime may
> work
> > > well
> > > > in insert-only or upsert source but has some problem in changelog
> > table.
> > > >
> > > > 'PERIOD FOR SYSTEM_TIME(operation_time)' in a temporal table
> > > > defines/maintains  the valid time of each row, the rowtime can not
> play
> > > the
> > > > history tracking function well.
> > > >
> > > > *# 1.*operation time (version time) *vs* rowtime (watermark)
> > > >
> > > > I will take an example to explain. The following changelog records
> came
> > > > from database table using debezium tool:
> > > > { "before":  null
> > > >   "after":    {"currency": "Euro", "rate": 118, "gmt_modified":
> > > > "12:00:01"},
> > > >   "op":       "c",  //INSERT
> > > >   "ts_ms": 1592971201000 // 2020-06-24 12:00:02
> > > > }
> > > > { "before": {"currency": "Euro", "rate": 114, "gmt_modified":
> > > "12:00:05"},
> > > >   "after":    {"currency": "Euro", "rate": 118, "gmt_modified":
> > > > "12:00:05"},
> > > >   "op":       "u",  //UPDATE
> > > >   "ts_ms": 1592971206000 // 2020-06-24 12:00:06
> > > > }
> > > >
> > > > { "before": {"currency": "Euro", "rate": 118, "gmt_modified":
> > > "12:00:05"},
> > > >   "after":     null,
> > > >   "op":        "d",  //DELETE
> > > >   "ts_ms":  1593000011000  // 2020-06-24 20:00:11
> > > > }
> > > >
> > > > The rowtime should be the "gmt_modified" field that belongs to the
> > > > original record,the "ts_ms" is the the operation time when the DML
> > > > statement happen in the DB. For DELETE changelog record, its
> > > "gmt_modified"
> > > > field (12:00:05) can not reflect the real operation time (20:00:11).
> > > >
> > > > In temporal join case, we should maintain the valid time of each row.
> > For
> > > > a DELETE event, we should use the operation time of DELETE as the
> “end
> > > > time” of the row. That says, the record {"currency": "Euro", "rate":
> > 118}
> > > > is not exist anymore after “20:00:11”, not “12:00:05”.
> > > >
> > > > we would not access the record {"currency": "Euro", "rate": 118,
> > > > "gmt_modified": "12:00:05"} when rowtime is bigger than (12:00:05) if
> > we
> > > > use rowtime to track the history version, because the DELETE
> changelog
> > > > record also has rowtime (12:00:05) and will clear the record in
> state.
> > In
> > > > fact, the expected result is that the record expires until (20:00:11)
> > > when
> > > > the record is deleted rather than the last update time(20:00:11) in
> > > > materialized state.
> > > >
> > > > From this case, I found rowtime and operation time should be
> orthogonal
> > > in
> > > > temporal table scenario. The operation time should be strictly
> > > > monotonically increasing  (no out of order) and only be used for
> > > tracking a
> > > > history version of a changelog table, every history version of
> > changelog
> > > > table equals a database table snapshot due to the stream-table
> duality.
> > > >
> > > > *# 2.*The semantic of rowtime and watermark on changelog
> > > >
> > > > The rowtime and watermark can also be defined on a changelog table
> just
> > > > like other source backed queue, Flink supports cascaded window
> > > aggregation
> > > > (with ) in SQL like:
> > > > SELECT
> > > >      TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND),
> > > >      MAX(rate) AS rate
> > > > FROM (
> > > >        SELECT
> > > >           MAX(rate) AS rate,
> > > >           TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) AS `rowtime`
> > > >        FROM currency
> > > >             GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND)
> > > >     )
> > > >     GROUP BY TUMBLE(rowtime, INTERVAL '60' SECOND
> > > >
> > > > We can think of the output of the first window aggregation as a
> > changelog
> > > > source of the second window aggregation. There are
> INSERT/UPDATE/DELETE
> > > > messages and also watermarks in the changelog stream. And the rowtime
> > in
> > > > the changelog stream is the `TUMBLE_ROWTIME` value (just like the
> > > > `gmt_modified` column in DB).
> > > >
> > > > *#  summary*
> > > >
> > > >    1. we should use ‘PERIOD FOR SYSTEM_TIME(operation_time) syntax to
> > > >    track history version by operation time rather than rowtime in
> > > temporal
> > > >    table scenario.
> > > >    2. we also support define a rowtime(watermark) on changelog table,
> > but
> > > >    the rowtime will not be used to track the history of changelog
> > stream.
> > > >
> > > >
> > > >
> > > > WDYT? please correct me if I am wrong.
> > > >
> > > >
> > > > Best,
> > > >
> > > > Leonard
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020年6月24日,11:31,Leonard Xu <xb...@gmail.com> 写道:
> > > >
> > > > Hi, everyone
> > > >
> > > > Thanks Fabian,Kurt for making the multiple version(event time)
> clear, I
> > > > also like the 'PERIOD FOR SYSTEM' syntax which supported in SQL
> > > standard. I
> > > > think we can add some explanation of the multiple version support in
> > the
> > > > future section of FLIP.
> > > >
> > > > For the PRIMARY KEY semantic, I agree with Jark's point that the
> > semantic
> > > > should unify both on changelog source and insert-only source.
> > > >
> > > > Currently, Flink supports PRIMARY KEY after FLIP-87, Flink uses
> PRIMARY
> > > > KEY NOT ENFORCED because Flink does not own the data like other DBMS
> > > therefore
> > > > Flink won't validate/enforce the key integrity and only trusts the
> > > external
> > > > systems. It is expected user and external system/application should
> > make
> > > > sure no deduplicated records happened when using NOT ENFORCED.
> > > >
> > > > (a) For PRIMARY KEY NOT ENFORCED semantic on changelog source:
> > > > It means the materialized changelogs (INSERT/UPDATE/DELETE) should be
> > > > unique on the primary key constraints.Flink assumes messages are in
> > order
> > > > on the primary key. Flink will use the PRIMARY KEY for some
> > optimization,
> > > > e.g. use the PRIMARY KEY to update the materialized state by key in
> > > > temporal join operator.
> > > >
> > > >
> > > > (b) For PRIMARY KEY NOT ENFORCED semantic on insert-only source:
> > > > It means records should be unique on the primary key constraints. If
> > > there
> > > > are INSERT records with duplicate primary key columns, the result of
> > SQL
> > > > query might be nondeterministic because it broken the PRIMARY KEY
> > > > constraints.
> > > >
> > > > Cheers,
> > > > Leonard
> > > >
> > > >
> > > > 在 2020年6月23日,23:35,Fabian Hueske <fh...@gmail.com> 写道:
> > > >
> > > > Thanks Kurt,
> > > >
> > > > Yes, you are right.
> > > > The `PERIOD FOR SYSTEM_TIME` that you linked before corresponds to
> the
> > > > VERSION clause that I used and would explicitly define the versioning
> > of
> > > a
> > > > table.
> > > > I didn't know that the `PERIOD FOR SYSTEM_TIME` cause is already
> > defined
> > > by
> > > > the SQL standard.
> > > > I think we would need a slightly different syntax though because (so
> > far)
> > > > the validity of a row is determined by its own timestamp and the
> > > timestamp
> > > > of the next row.
> > > >
> > > > Adding a clause later solves the ambiguity issue for tables with
> > multiple
> > > > event-time attributes.
> > > > However, I'd feel more comfortable having such a cause and an
> explicit
> > > > definition of the temporal property from the beginning.
> > > > I guess this is a matter of personal preference so I'll go with the
> > > > majority if we decide that every table that has a primary key and an
> > > > event-time attribute should be usable in an event-time temporal table
> > > join.
> > > >
> > > > Thanks, Fabian
> > > >
> > > >
> > > > Am Di., 23. Juni 2020 um 16:58 Uhr schrieb Kurt Young <
> > ykt836@gmail.com
> > > >:
> > > >
> > > > Hi Fabian,
> > > >
> > > > I agree with you that implicitly letting event time to be the version
> > of
> > > > the table will
> > > > work in most cases, but not for all. That's the reason I mentioned
> > > `PERIOD
> > > > FOR` [1]
> > > > syntax in my first email, which is already in sql standard to
> represent
> > > the
> > > > validity of
> > > > each row in the table.
> > > >
> > > > If the event time can't be used, or multiple event time are defined,
> we
> > > > could still add
> > > > this syntax in the future.
> > > >
> > > > What do you think?
> > > >
> > > > [1]
> > > >
> > > >
> > > >
> > >
> >
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Tue, Jun 23, 2020 at 9:12 PM Fabian Hueske <fh...@gmail.com>
> > wrote:
> > > >
> > > > Hi everyone,
> > > >
> > > > Every table with a primary key and an event-time attribute provides
> > what
> > > >
> > > > is
> > > >
> > > > needed for an event-time temporal table join.
> > > > I agree that, from a technical point of view, the TEMPORAL keyword is
> > not
> > > > required.
> > > >
> > > > I'm more sceptical about implicitly deriving the versioning
> information
> > > >
> > > > of
> > > >
> > > > a (temporal) table as the table's only event-time attribute.
> > > > In the query
> > > >
> > > > SELECT *
> > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
> > > > WHERE o.currency = r.currency
> > > >
> > > > the syntax of the temporal table join does not explicitly reference
> the
> > > > version of the temporal rates table.
> > > > Hence, the system needs a way to derive the version of temporal
> table.
> > > >
> > > > Implicitly using the (only) event-time attribute of a temporal table
> > > >
> > > > (rates
> > > >
> > > > in the example above) to identify the right version works in most
> > cases,
> > > > but probably not in all.
> > > > * What if a table has more than one event-time attribute?
> (TableSchema
> > is
> > > > designed to support multiple watermarks; queries with interval joins
> > > > produce tables with multiple event-time attributes, ...)
> > > > * What if the table does not have an event-time attribute in its
> schema
> > > >
> > > > but
> > > >
> > > > the version should only be provided as meta data?
> > > >
> > > > We could add a clause to define the version of a table, such as:
> > > >
> > > > CREATE TABLE rates (
> > > >   currency CHAR(3) NOT NULL PRIMARY KEY,
> > > >   rate DOUBLE,
> > > >   rowtime TIMESTAMP,
> > > >   WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE),
> > > > VERSION (rowtime)
> > > > WITH (...);
> > > >
> > > > The presence of a the VERSION clause (or whatever syntax) would
> > > >
> > > > explicitly
> > > >
> > > > define the version of a (temporal) table.
> > > > It would also render the need for the TEMPORAL keyword superfluous
> > > >
> > > > because
> > > >
> > > > there would be another indicator that a table can be used in a
> temporal
> > > > table join.
> > > >
> > > > I'm OK with not adding the TEMPORAL keyword, but I recommend that we
> > > >
> > > > think
> > > >
> > > > again about the proposed implicit definition of a table's version and
> > how
> > > > it might limit use in the future.
> > > >
> > > > Cheers,
> > > > Fabian
> > > >
> > > > Am Mo., 22. Juni 2020 um 16:14 Uhr schrieb Jark Wu <imjark@gmail.com
> >:
> > > >
> > > > I'm also +1 for not adding the TEMPORAL keyword.
> > > >
> > > > +1 to make the PRIMARY KEY semantic clear for sources.
> > > > From my point of view:
> > > >
> > > > 1) PRIMARY KEY on changelog souruce:
> > > > It means that when the changelogs (INSERT/UPDATE/DELETE) are
> > > >
> > > > materialized,
> > > >
> > > > the materialized table should be unique on the primary key columns.
> > > > Flink assumes messages are in order on the primary key. Flink doesn't
> > > > validate/enforces the key integrity, but simply trust it (thus NOT
> > > > ENFORCED).
> > > > Flink will use the PRIMARY KEY for some optimization, e.g. use the
> > > >
> > > > PRIMARY
> > > >
> > > > KEY to update the materilized state by key in temporal join operator.
> > > >
> > > > 2) PRIMARY KEY on insert-only source:
> > > > I prefer to have the same semantic to the batch source and changelog
> > > > source, that it implies that records are not duplicate on the primary
> > > >
> > > > key.
> > > >
> > > > Flink just simply trust the primary key constraint, and doesn't valid
> > > >
> > > > it.
> > > >
> > > > If there is duplicate primary keys with INSERT changeflag, then
> result
> > > >
> > > > of
> > > >
> > > > Flink query might be wrong.
> > > >
> > > > If this is a TEMPORAL TABLE FUNCTION scenario, that source emits
> > > >
> > > > duplicate
> > > >
> > > > primary keys with INSERT changeflag, when we migrate this case to
> > > >
> > > > temporal
> > > >
> > > > table DDL,
> > > > I think this source should emit INSERT/UPDATE (UPSERT) messages
> instead
> > > >
> > > > of
> > > >
> > > > INSERT-only messages,  e.g. a Kafka compacted topic source?
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Mon, 22 Jun 2020 at 17:04, Konstantin Knauf <kn...@apache.org>
> > > >
> > > > wrote:
> > > >
> > > >
> > > > Hi everyone,
> > > >
> > > > I also agree with Leonard/Kurt's proposal for CREATE TEMPORAL TABLE.
> > > >
> > > > Best,
> > > >
> > > > Konstantin
> > > >
> > > > On Mon, Jun 22, 2020 at 10:53 AM Kurt Young <yk...@gmail.com>
> > > >
> > > > wrote:
> > > >
> > > >
> > > > I agree with Timo, semantic about primary key needs more thought
> > > >
> > > > and
> > > >
> > > > discussion, especially after FLIP-95 and FLIP-105.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Mon, Jun 22, 2020 at 4:45 PM Timo Walther <tw...@apache.org>
> > > >
> > > > wrote:
> > > >
> > > >
> > > > Hi Leonard,
> > > >
> > > > thanks for the summary.
> > > >
> > > > After reading all of the previous arguments and working on
> > > >
> > > > FLIP-95. I
> > > >
> > > > would also lean towards the conclusion of not adding the TEMPORAL
> > > >
> > > > keyword.
> > > >
> > > >
> > > > After FLIP-95, what we considered as a CREATE TEMPORAL TABLE can
> > > >
> > > > be
> > > >
> > > > represented as a CREATE TABLE with PRIMARY KEY and WATERMARK. The
> > > >
> > > > FOR
> > > >
> > > > SYSTEM_TIME AS OF t would trigger the internal materialization
> > > >
> > > > and
> > > >
> > > > "temporal" logic.
> > > >
> > > > However, we should discuss the meaning of PRIMARY KEY again in
> > > >
> > > > this
> > > >
> > > > case. In a TEMPORAL TABLE scenario, the source would emit
> > > >
> > > > duplicate
> > > >
> > > > primary keys with INSERT changeflag but at different point in
> > > >
> > > > time.
> > > >
> > > > Currently, we require a PRIMARY KEY NOT ENFORCED declaration. The
> > > > changelog semantics of FLIP-95 and FLIP-105 don't work well with
> > > >
> > > > a
> > > >
> > > > primary key declaration.
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > > On 20.06.20 17:08, Leonard Xu wrote:
> > > >
> > > > Hi everyone,
> > > >
> > > > Thanks for the nice discussion. I’d like to move forward the
> > > >
> > > > work,
> > > >
> > > > please let me simply summarize the main opinion and current
> > > >
> > > > divergences.
> > > >
> > > >
> > > > 1. The agreements have been achieved:
> > > >
> > > > 1.1 The motivation we're discussing temporal table DDL is just
> > > >
> > > > for
> > > >
> > > > creating temporal table in pure SQL to replace pre-process
> > > >
> > > > temporal
> > > >
> > > > table
> > > >
> > > > in YAML/Table API for usability.
> > > >
> > > > 1.2 The reason we use "TEMPORAL" keyword rather than “PERIOD
> > > >
> > > > FOR
> > > >
> > > > SYSTEM_TIME” is to make user understand easily.
> > > >
> > > > 1.3 For append-only table, it can convert to changelog table
> > > >
> > > > which
> > > >
> > > > has
> > > >
> > > > been discussed in FLIP-105, we assume the following temporal
> > > >
> > > > table
> > > >
> > > > is
> > > >
> > > > comes
> > > >
> > > > from changelog (Jark, fabian, Timo).
> > > >
> > > > 1.4 For temporal join syntax, using "FOR SYSTEM_TIME AS OF x"
> > > >
> > > > instead
> > > >
> > > > of
> > > >
> > > > the current `LATERAL TABLE(rates(x))`  has come to an
> > > >
> > > > agreement(Fabian,
> > > >
> > > > Timo, Seth, Konstantin, Kurt).
> > > >
> > > >
> > > > 2. The small divergence :
> > > >
> > > > About the definition syntax of the temporal table,
> > > >
> > > > CREATE [TEMPORAL] TABLE rates (
> > > >    currency CHAR(3) NOT NULL PRIMARY KEY,
> > > >    rate DOUBLE,
> > > >    rowtime TIMESTAMP,
> > > >    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
> > > > WITH (...);
> > > >
> > > > there is small divergence whether add "TEMPORAL" keyword or
> > > >
> > > > not.
> > > >
> > > >
> > > > 2.1  one opinion is using "CREATE TEMPORAL TABLE" (Timo,
> > > >
> > > > Fabian,
> > > >
> > > > Seth),
> > > >
> > > > the main advantages are:
> > > >
> > > > (1)"TEMPORAL" keyword is intuitive to indicate the history
> > > >
> > > > tracking
> > > >
> > > > semantics.
> > > >
> > > > (2)"TEMPORAL" keyword illustrates that queries can visit the
> > > >
> > > > previous
> > > >
> > > > versions of a table like other DBMS use "PERIOD FOR SYSTEM_TIME"
> > > >
> > > > keyword.
> > > >
> > > >
> > > > 2.2 the other is using "CREATE TABLE"(Kurt), the main
> > > >
> > > > advantages
> > > >
> > > > are:
> > > >
> > > > (1)Just primary key and time attribute can track previous
> > > >
> > > > versions
> > > >
> > > > of a
> > > >
> > > > table well.
> > > >
> > > > (2)The temporal behavior is triggered by temporal join syntax
> > > >
> > > > rather
> > > >
> > > > than in DDL, all Flink DDL table are dynamic table logically
> > > >
> > > > including
> > > >
> > > > temporal table. If we decide to use "TEMPORAL" keyword and treats
> > > >
> > > > changelog
> > > >
> > > > as temporal table, other tables backed queue like Kafka should
> > > >
> > > > also
> > > >
> > > > use
> > > >
> > > > "TEMPORAL" keyword.
> > > >
> > > >
> > > >
> > > > IMO, the statement “CREATE TEMPORARY TEMPORAL TABLE...” follows
> > > >
> > > > with
> > > >
> > > > 2.1
> > > >
> > > > may confuse users much. If we take a second to think about, for
> > > >
> > > > source/sink
> > > >
> > > > table which may backed queue (like kafka) or DB (like MySQL), we
> > > >
> > > > did
> > > >
> > > > not
> > > >
> > > > add any keyword in DDL to specify they are source or sinks, it
> > > >
> > > > works
> > > >
> > > > well.
> > > >
> > > > I think temporal table is the third one,  kafka data source and
> > > >
> > > > DB
> > > >
> > > > data
> > > >
> > > > source can play as a source/sink/temporal table depends on the
> > > > position/syntax that user put them in the query. The above rates
> > > >
> > > > table
> > > >
> > > >     - can be a source table if user put it at `SELECT * FROM
> > > >
> > > > rates;`
> > > >
> > > >     - can be a temporal table if user put it at `SELECT * FROM
> > > >
> > > > orders
> > > >
> > > > JOIN rates FOR SYSTEM_TIME AS OF orders.proctime
> > > >
> > > >              ON orders.currency = rates.currency;`
> > > >     - can be sink table if user put is at `INSERT INTO rates
> > > >
> > > > SELECT
> > > >
> > > > *
> > > >
> > > > FROM …; `
> > > >
> > > > From these cases, we found all tables defined in Flink should
> > > >
> > > > be
> > > >
> > > > dynamic table logically, the source/sink/temporal role depends on
> > > >
> > > > the
> > > >
> > > > position/syntax in user’s query.
> > > >
> > > >       In fact we have used similar syntax for current lookup
> > > >
> > > > table,
> > > >
> > > > we
> > > >
> > > > didn’t add “LOOKUP" or “TEMPORAL" keyword for lookup table and
> > > >
> > > > trigger
> > > >
> > > > the
> > > >
> > > > temporal join from the position/syntax(“FOR SYSTEM_TIME AS OF x")
> > > >
> > > > in
> > > >
> > > > query.
> > > >
> > > >
> > > > So, I prefer to resolve the small divergence with “CREATE
> > > >
> > > > TABLE”
> > > >
> > > > which
> > > >
> > > > (1) is more unified with our source/sink/temporal dynamic table
> > > >
> > > > conceptually,
> > > >
> > > > (2) is aligned with current lookup table,
> > > > (3) also make users learn less keyword.
> > > >
> > > > WDYT?
> > > >
> > > > Best,
> > > > Leonard Xu
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Konstantin Knauf
> > > >
> > > > https://twitter.com/snntrable
> > > >
> > > > https://github.com/knaufk
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>