You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2018/02/20 23:07:44 UTC

SQL materialized upsert tables

I noticed that has been significant work on the SQL / Table subsystem and
decided to evaluate it for one of our use cases.  The use case requires the
joining of two streams, which can be considered a stream of table upserts.
Critically, when joining the streams, we only want to join against the
latest value per key in one of the tables/streams.

Simply performing a join between the stream/tables is not sufficient, as it
will generate result of records other than the latest one.  E.g. if you
have two steam/tables with schema:

Telemetry [
  tstamp: Long
  item: String
  score: Int
  source: String
]

Scores [
  tstamp: Long
  item: String
  score: Int
]

tableEnv.sqlQuery("""
SELECT s.tstamp, s.item, s.score, t.source
FROM    Telemetry t INNER JOIN Scores s ON s.item = t.item
WHERE s.score <> t.score AND s.tstamp >= t.tstamp
""")

If the stream receives 3 records from the telemetry stream for the same
source and then a record that matches the item from the score stream that
updates the score, it will generate three output records, even though we
only want the latest record from the source to be considered.

If this were a regular database we could do the following to only get the
latest records with the telemetry table:

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
    SELECT MAX(tstamp), item, source
    FROM Telemetry
    GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source
""")

and then execute the previous query against this LatestTelemetry table
instead of Telemetry.  But that does not work.  The query executed within
Flink, but still outputs multiple records, regardless of the order the
records come into the source streams.

I am wondering if there is a way to accomplish this within Flink's
SQL/Table abstractions.

Kafka Streams has the concept of a KTable, where records are considered
upserts and update previous records that have the same key.  Thus, when you
join against a KTable, you only join against the latest record for a given
key, rather than all previous records for the key.

Thoughts?

Re: SQL materialized upsert tables

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Elias,

it would be great if you could let us know if the approach works.

Btw. I should point out that the join in a query like:

SELECT s.tstamp, s.item, s.score, t.source
FROM    (
SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
FROM Telemetry
GROUP BY item, source
) INNER JOIN Scores s ON s.item = t.item
WHERE s.score <> t.score

is a full-history join and will fully materialize both inputs, the upserted
Telemetry table and the append-only Scores table.
The query would hold a left join state with one row per item-store
combination in Telemetry, and a right join state for each row of Scores.
Moreover, each update on the Telemetry table will change the output for all
rows of Scores that are affected by the update.
You can configure a state retention time. This will clean up state per key
(in case of the join above based on the equi-join attribute) if a key did
not receive new data within the retention time.

The typical use case for full-history joins is to join two upserted or
GROUP-BY-aggregated tables, i.e,. tables that are updated but remain more
or less constant in size.

Best, Fabian

2018-02-21 20:00 GMT+01:00 Elias Levy <fe...@gmail.com>:

> On Wed, Feb 21, 2018 at 3:24 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Elias,
>>
>> Flink does not have built-in support for upsert stream -> table
>> conversions, yet. However, the community is working on that (see FLINK-8545
>> [1]).
>> With a workaround, you can also solve the issue with what Flink supports
>> so far.
>>
>
> Fabian,
>
> Thanks for the reply.  Great to see some progress on this area.  If we
> could implement this job in Flink rather than Kafka Stream it would mean
> one less technology to support and to train our developers on, which is
> always a plus.
>
>
>
>> The approach with the MAX(tstamp) query was good idea, but the query
>> needs another join predicate on time.
>>
>> tableEnv.sqlQuery("""
>> SELECT a.tstamp, a.item, a.score, a.source
>> FROM Telemetry a
>>   INNER JOIN (
>>     SELECT MAX(tstamp) AS maxT, item, source
>>     FROM Telemetry
>>     GROUP BY item, source
>>   ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
>> """)
>>
>> Otherwise, the table will have multiple records for each combination of
>> item and score as you noticed.
>>
>> HOWEVER, you might not want to use the query above because it will
>> accumulate all records from Telemetry in state and never clean them up.
>> The reason for this is that the query planner is not smart enough yet to
>> infer that old records will never be joined (this is implied by the join
>> condition on time).
>>
>
> Thanks for the correction.  But, yes, the indefinite accumulation is a
> deal breakers for using this approach.
>
>
> A better solution is to use a custom user-defined aggregation function [2]
>> (LAST_VAL) that returns the value with associated max timestamp.
>>
>> SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
>> FROM Telemetry
>> GROUP BY item, source
>>
>> LAST_VAL would have an accumulator that stores a score and its associated
>> timestamp.
>> When a new (score, timestamp) pair is accumulated, the UDAGG compares the
>> timestamps and only updates the accumulator if the new timestamp is larger.
>>
>
> I'll give this approach a try.
>
>
> Btw. I'm not sure if KStreams only updates the KTable if the update has a
>> higher timestamp or just take the last received record.
>> That might be an issue with out-of-order data. I would check the behavior
>> if you expect data with out-of-order timestamps.
>>
>
> I believe you are correct.  KS will attempt to consume records from across
> partitions by attempting to align their timestamps, but it won't reorder
> records within a partition, which can be problematic if you can't guarantee
> ordered records within a partition.  While I talked about KTables, in
> reality the job I wrote is a combination of the KS Stream DSL and Operator
> API, to get around some of these issues.
>
> The upsert stream table conversion that we are working on will support
>> event time (max timestamp) or processing time (last value) upserts.
>>
>
> Excellent.
>
>
> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8545
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/udfs.html#aggregation-functions
>>
>

Re: SQL materialized upsert tables

Posted by Elias Levy <fe...@gmail.com>.
On Wed, Feb 21, 2018 at 3:24 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Elias,
>
> Flink does not have built-in support for upsert stream -> table
> conversions, yet. However, the community is working on that (see FLINK-8545
> [1]).
> With a workaround, you can also solve the issue with what Flink supports
> so far.
>

Fabian,

Thanks for the reply.  Great to see some progress on this area.  If we
could implement this job in Flink rather than Kafka Stream it would mean
one less technology to support and to train our developers on, which is
always a plus.



> The approach with the MAX(tstamp) query was good idea, but the query needs
> another join predicate on time.
>
> tableEnv.sqlQuery("""
> SELECT a.tstamp, a.item, a.score, a.source
> FROM Telemetry a
>   INNER JOIN (
>     SELECT MAX(tstamp) AS maxT, item, source
>     FROM Telemetry
>     GROUP BY item, source
>   ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
> """)
>
> Otherwise, the table will have multiple records for each combination of
> item and score as you noticed.
>
> HOWEVER, you might not want to use the query above because it will
> accumulate all records from Telemetry in state and never clean them up.
> The reason for this is that the query planner is not smart enough yet to
> infer that old records will never be joined (this is implied by the join
> condition on time).
>

Thanks for the correction.  But, yes, the indefinite accumulation is a deal
breakers for using this approach.


A better solution is to use a custom user-defined aggregation function [2]
> (LAST_VAL) that returns the value with associated max timestamp.
>
> SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
> FROM Telemetry
> GROUP BY item, source
>
> LAST_VAL would have an accumulator that stores a score and its associated
> timestamp.
> When a new (score, timestamp) pair is accumulated, the UDAGG compares the
> timestamps and only updates the accumulator if the new timestamp is larger.
>

I'll give this approach a try.


Btw. I'm not sure if KStreams only updates the KTable if the update has a
> higher timestamp or just take the last received record.
> That might be an issue with out-of-order data. I would check the behavior
> if you expect data with out-of-order timestamps.
>

I believe you are correct.  KS will attempt to consume records from across
partitions by attempting to align their timestamps, but it won't reorder
records within a partition, which can be problematic if you can't guarantee
ordered records within a partition.  While I talked about KTables, in
reality the job I wrote is a combination of the KS Stream DSL and Operator
API, to get around some of these issues.

The upsert stream table conversion that we are working on will support
> event time (max timestamp) or processing time (last value) upserts.
>

Excellent.


Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8545
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/udfs.html#aggregation-functions
>

Re: SQL materialized upsert tables

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Elias,

Flink does not have built-in support for upsert stream -> table
conversions, yet. However, the community is working on that (see FLINK-8545
[1]).
With a workaround, you can also solve the issue with what Flink supports so
far.

The approach with the MAX(tstamp) query was good idea, but the query needs
another join predicate on time.

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
    SELECT MAX(tstamp) AS maxT, item, source
    FROM Telemetry
    GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
""")

Otherwise, the table will have multiple records for each combination of
item and score as you noticed.

HOWEVER, you might not want to use the query above because it will
accumulate all records from Telemetry in state and never clean them up.
The reason for this is that the query planner is not smart enough yet to
infer that old records will never be joined (this is implied by the join
condition on time).

A better solution is to use a custom user-defined aggregation function [2]
(LAST_VAL) that returns the value with associated max timestamp.

SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
FROM Telemetry
GROUP BY item, source

LAST_VAL would have an accumulator that stores a score and its associated
timestamp.
When a new (score, timestamp) pair is accumulated, the UDAGG compares the
timestamps and only updates the accumulator if the new timestamp is larger.

Btw. I'm not sure if KStreams only updates the KTable if the update has a
higher timestamp or just take the last received record.
That might be an issue with out-of-order data. I would check the behavior
if you expect data with out-of-order timestamps.

The upsert stream table conversion that we are working on will support
event time (max timestamp) or processing time (last value) upserts.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8545
[2] https://ci.apache.org/projects/flink/flink-docs-
release-1.4/dev/table/udfs.html#aggregation-functions

2018-02-21 1:06 GMT+01:00 Elias Levy <fe...@gmail.com>:

> [ Adding the list back in, as this clarifies my question ]
>
> On Tue, Feb 20, 2018 at 3:42 PM, Darshan Singh <da...@gmail.com>
> wrote:
>
>> I am no expert in Flink but I will try my best. Issue you mentioned will
>> be with all streaming systems even with Kafka KTable I use them a lot for
>> similar sort of requirements.
>>
>> In Kafka you have KTable on Telemetry with 3 records and join with say
>> Scores which could be KTable or Kstrem  and you start your streaming query
>> as mentioned above it will give just 1 row as expected. However, if there
>> is a new value for the same key with timestamp greater than previous max
>> will be added to the Telemetry it will output the new value as well and
>> that is main idea about the streaming anyway you want to see the changed
>> value. So once you started streaming you will get whatever is the outcome
>> of your
>>
>
> Darshan,
>
> Thanks for the reply.  I've already implemented this job using Kafka
> Streams, so I am aware of how KTables behaves.  I would have helped if I
> had included some sample data in my post, so here it is.  If you have this
> data coming into Telemetry:
>
> ts, item, score, source
> 0, item1, 1, source1
> 1, item1, 1, source1
> 2, item1, 1, source1
>
> And this comes into Scores:
>
> ts, item, score
> 3, item1, 3
>
> Flink will output 3 records from the queries I mentioned:
>
> (3, item1, 3, source1)
> (3, item1, 3, source1)
> (3, item1, 3, source1)
>
>
> In contrast, if you run the query in Kafka Stream configuring Telemetry as
> a KTable keyed by (item, source), the output will be a single record.  In
> Telemetry record for key (item1, source1) at time 1 will overwrite the
> record at time 0, and the record at time 2 will overwrite the one at time
> 1.  By the time the record at time 3 comes in via Scores, it will be joined
> only with the record from time 2 in Telemetry.
>
> Yes, it is possible for the Kafka Streams query to output multiple records
> if the records from the different streams are not time aligned, as Kafka
> Streams only guarantees a best effort aligning the streams. But in the
> common case the output will be a single record.
>
>
> I think in fllink you can do the same, from your telemeter stream/table
>> you can create the LatestTelemetry table using similar sql(I am sure it
>> should give you latest timestamp's data) as you did with the RDBMS and then
>> join with scores table. You should get similar results to KTable or any
>> other streaming system.
>>
>
> Not sure if you missed it, but I actually executed the query to define the
> LatestTelemetry table in Flink using that query and joined against it.  The
> output was the same three records.
>
>

Re: SQL materialized upsert tables

Posted by Elias Levy <fe...@gmail.com>.
[ Adding the list back in, as this clarifies my question ]

On Tue, Feb 20, 2018 at 3:42 PM, Darshan Singh <da...@gmail.com>
wrote:

> I am no expert in Flink but I will try my best. Issue you mentioned will
> be with all streaming systems even with Kafka KTable I use them a lot for
> similar sort of requirements.
>
> In Kafka you have KTable on Telemetry with 3 records and join with say
> Scores which could be KTable or Kstrem  and you start your streaming query
> as mentioned above it will give just 1 row as expected. However, if there
> is a new value for the same key with timestamp greater than previous max
> will be added to the Telemetry it will output the new value as well and
> that is main idea about the streaming anyway you want to see the changed
> value. So once you started streaming you will get whatever is the outcome
> of your
>

Darshan,

Thanks for the reply.  I've already implemented this job using Kafka
Streams, so I am aware of how KTables behaves.  I would have helped if I
had included some sample data in my post, so here it is.  If you have this
data coming into Telemetry:

ts, item, score, source
0, item1, 1, source1
1, item1, 1, source1
2, item1, 1, source1

And this comes into Scores:

ts, item, score
3, item1, 3

Flink will output 3 records from the queries I mentioned:

(3, item1, 3, source1)
(3, item1, 3, source1)
(3, item1, 3, source1)


In contrast, if you run the query in Kafka Stream configuring Telemetry as
a KTable keyed by (item, source), the output will be a single record.  In
Telemetry record for key (item1, source1) at time 1 will overwrite the
record at time 0, and the record at time 2 will overwrite the one at time
1.  By the time the record at time 3 comes in via Scores, it will be joined
only with the record from time 2 in Telemetry.

Yes, it is possible for the Kafka Streams query to output multiple records
if the records from the different streams are not time aligned, as Kafka
Streams only guarantees a best effort aligning the streams. But in the
common case the output will be a single record.


I think in fllink you can do the same, from your telemeter stream/table you
> can create the LatestTelemetry table using similar sql(I am sure it should
> give you latest timestamp's data) as you did with the RDBMS and then join
> with scores table. You should get similar results to KTable or any other
> streaming system.
>

Not sure if you missed it, but I actually executed the query to define the
LatestTelemetry table in Flink using that query and joined against it.  The
output was the same three records.