You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by meneldor <me...@gmail.com> on 2021/02/04 11:22:26 UTC

Kafka connector doesn't support consuming update and delete changes in Table SQL API

Hello,
Flink 1.12.1(pyflink)
I am deduplicating CDC records coming from Maxwell in a kafka topic.  Here
is the SQL:

CREATE TABLE stats_topic(
>           `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>           `ts` BIGINT,
>           `xid` BIGINT ,
>           row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>           WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>         ) WITH (
>           'connector' = 'kafka',
>           'format' = 'json',
>           'topic' = 'stats_topic',
>           'properties.bootstrap.servers' = 'localhost:9092',
>           'properties.group.id' = 'test_group'
>         )
>
> CREATE TABLE sink_table(
>           `id` BIGINT,
>           `account` INT,
>           `upd_ts` BIGINT
>         ) WITH (
>           'connector' = 'kafka',
>           'format' = 'json',
>           'topic' = 'sink_topic',
>           'properties.bootstrap.servers' = 'localhost:9092',
>           'properties.group.id' = 'test_group'
>         )
>
>
> INSERT INTO sink_table
> SELECT
> id,
> account,
> upd_ts
> FROM (
> SELECT
>  id,
>  account,
>  upd_ts,
>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
> FROM stats_topic
> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
> )
> WHERE rownum=1
>

 As there are a lot of CDC records for a single ID im using ROW_NUMBER()
and produce them on a 20 minutes interval to the sink_topic. The problem is
that flink doesnt allow me to use it in combination with with the kafka
connector:

> pyflink.util.exceptions.TableException: Table sink
> 'default_catalog.default_database.sink_table' doesn't support consuming
> update and delete changes which is produced by node
> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
> select=[$f0, $f1, $f2])
>

If I use the* upsert-kafka* connector everything is fine but then i receive
empty JSON records in the sink topic:

> {"id": 111111, "account": 4, "upd_ts": 1612334952}
> {"id": 222222, "account": 4, "upd_ts": 1612334953}
> {}
> {"id": 333333, "account": 4, "upd_ts": 1612334955}
> {}
> {"id": 444444, "account": 4, "upd_ts": 1612334956}
>

Thank you!

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Posted by meneldor <me...@gmail.com>.
The query which I'm testing now(trying to avoid the deduplication query
because of tombstones) is *almost* correct but there are two questions
which I can find an answer to:
1. Some of the *id*'s are just stopping to be produced.
2. Does the Tuble window select only the records whose upd_ts is new or the
query will always produce all the records in the dynamic table *stats_topic*
with the max(upd_ts)?

CREATE TABLE stats_topic(
  `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
  `ts` BIGINT,
  `xid` BIGINT ,
  row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
  WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
) WITH (
  'connector' = 'kafka',
  'format' = 'json',
  'topic' = 'stats_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'test_group'
)


INSERT INTO sink_table
SELECT distinct id, account, upd_ts
FROM stats_topic t, (
   SELECT id, max(upd_ts) as maxTs,
   FROM stats_topic
   GROUP BY id, TUMBLE(row_ts, INTERVAL '20' MINUTE)
) s
WHERE  t.id = s.id AND t.upd_ts = s.maxTs


Thank you!

On Thu, Feb 11, 2021 at 1:36 PM meneldor <me...@gmail.com> wrote:

> Are you sure that the null records are not actually tombstone records? If
>> you use upsert tables you usually want to have them + compaction. Or how
>> else will you deal with deletions?
>
> yes they are tombstone records, but i cannot avoid them because the
> deduplication query cant produce an append-only connector on a LastRow.
>
> What do you want to achieve? CDC records should be deduplicated by
>> definition.
>> I'm assuming that you want to aggregate the state to the current state.
>> If so, how do you decide when the record is complete (e.g. no future
>> updates) and can be written?
>> I have the feeling that you are using CDC at a place where you don't want
>> to use it, so maybe it helps to first explain your use case. Is stream
>> processing a good fit for you in the first place?
>
> Yes, I want to aggregate the state to the current state. The problem is
> that the records are gonna be merged in a database by an ETL every hour. So
> i don't need all the updates but only the last one, thats why im using a
> window function and the future updates will be evaluated by the MERGE query
> in the ETL too.
>
> I've changed the query to instead use max(upd_ts) which is producing to
> append only stream and it works but im not 100% sure if the result is the
> same:
>
>> INSERT INTO sink_table
>> SELECT distinct id, account, upd_ts
>> FROM stats_topic t, (
>>    SELECT id, account, max(upd_ts) as maxTs,
>>    FROM stats_topic
>>    GROUP BY id, account, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>> ) s
>> WHERE  t.id = s.id AND t.upd_ts = s.maxTs AND t.account = s.account
>
>
> Thanks!
>
> On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi,
>>
>> Are you sure that the null records are not actually tombstone records? If
>> you use upsert tables you usually want to have them + compaction. Or how
>> else will you deal with deletions?
>>
>> Is there anyone who is successfully deduplicating CDC records into either
>>> kafka topic or S3 files(CSV/parquet) ?
>>>
>> What do you want to achieve? CDC records should be deduplicated by
>> definition.
>> I'm assuming that you want to aggregate the state to the current state.
>> If so, how do you decide when the record is complete (e.g. no future
>> updates) and can be written?
>>
>> I have the feeling that you are using CDC at a place where you don't want
>> to use it, so maybe it helps to first explain your use case. Is stream
>> processing a good fit for you in the first place?
>>
>> On Tue, Feb 9, 2021 at 10:37 AM meneldor <me...@gmail.com> wrote:
>>
>>> Unfortunately using row_ts doesn't help. Setting the kafka
>>> topic cleanup.policy to compact is not a very good idea as it increases
>>> cpu, memory and might lead to other problems.
>>> So for now I'll just ignore the null records. Is there anyone who is
>>> successfully deduplicating CDC records into either kafka topic or S3
>>> files(CSV/parquet) ?
>>>
>>> Thanks!
>>>
>>> On Mon, Feb 8, 2021 at 7:13 PM meneldor <me...@gmail.com> wrote:
>>>
>>>> Thanks for the quick reply, Timo. Ill test with the  row_ts and
>>>> compaction mode suggestions. However, ive read somewhere in the archives
>>>> that the append only stream is only possible if i extract "the first"
>>>> record from the ranking only which in my case is the oldest record.
>>>>
>>>> Regards
>>>>
>>>> On Mon, Feb 8, 2021, 18:56 Timo Walther <tw...@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> could the problem be that you are mixing OVER and TUMBLE window with
>>>>> each other? The TUMBLE is correctly defined over time attribute
>>>>> `row_ts`
>>>>> but the OVER window is defined using a regular column `upd_ts`. This
>>>>> might be the case why the query is not append-only but updating.
>>>>>
>>>>> Maybe you can split the problem into sub queries and share the plan
>>>>> with
>>>>> us using .explain()?
>>>>>
>>>>> The nulls in upsert-kafka should be gone once you enable compaction
>>>>> mode
>>>>> in Kafka.
>>>>>
>>>>> I hope this helps.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 08.02.21 10:53, Khachatryan Roman wrote:
>>>>> > Hi,
>>>>> >
>>>>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
>>>>> > I'm pulling in Timo and Jark who might know better.
>>>>> >
>>>>> > https://issues.apache.org/jira/browse/FLINK-19857
>>>>> > <https://issues.apache.org/jira/browse/FLINK-19857>
>>>>> >
>>>>> > Regards,
>>>>> > Roman
>>>>> >
>>>>> >
>>>>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <meneldor@gmail.com
>>>>> > <ma...@gmail.com>> wrote:
>>>>> >
>>>>> >     Any help please? Is there a way to use the "Last row" from a
>>>>> >     deduplication in an append-only stream or tell upsert-kafka to
>>>>> not
>>>>> >     produce *null* records in the sink?
>>>>> >
>>>>> >     Thank you
>>>>> >
>>>>> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <meneldor@gmail.com
>>>>> >     <ma...@gmail.com>> wrote:
>>>>> >
>>>>> >         Hello,
>>>>> >         Flink 1.12.1(pyflink)
>>>>> >         I am deduplicating CDC records coming from Maxwell in a kafka
>>>>> >         topic.  Here is the SQL:
>>>>> >
>>>>> >             CREATE TABLE stats_topic(
>>>>> >                        `data` ROW<`id` BIGINT, `account` INT,
>>>>> `upd_ts`
>>>>> >             BIGINT>,
>>>>> >                        `ts` BIGINT,
>>>>> >                        `xid` BIGINT ,
>>>>> >                        row_ts AS
>>>>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>>>> >                        WATERMARK FOR `row_ts` AS `row_ts`   -
>>>>> INTERVAL
>>>>> >             '15' SECOND
>>>>> >                      ) WITH (
>>>>> >                        'connector' = 'kafka',
>>>>> >                        'format' = 'json',
>>>>> >                        'topic' = 'stats_topic',
>>>>> >                        'properties.bootstrap.servers' =
>>>>> 'localhost:9092',
>>>>> >                        'properties.group.id
>>>>> >             <http://properties.group.id>' = 'test_group'
>>>>> >                      )
>>>>> >
>>>>> >             CREATE TABLE sink_table(
>>>>> >                        `id` BIGINT,
>>>>> >                        `account` INT,
>>>>> >                        `upd_ts` BIGINT
>>>>> >                      ) WITH (
>>>>> >                        'connector' = 'kafka',
>>>>> >                        'format' = 'json',
>>>>> >                        'topic' = 'sink_topic',
>>>>> >                        'properties.bootstrap.servers' =
>>>>> 'localhost:9092',
>>>>> >                        'properties.group.id
>>>>> >             <http://properties.group.id>' = 'test_group'
>>>>> >                      )
>>>>> >
>>>>> >
>>>>> >             INSERT INTO sink_table
>>>>> >             SELECT
>>>>> >             id,
>>>>> >             account,
>>>>> >             upd_ts
>>>>> >             FROM (
>>>>> >             SELECT
>>>>> >               id,
>>>>> >               account,
>>>>> >               upd_ts,
>>>>> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts
>>>>> desc)
>>>>> >             AS rownum
>>>>> >             FROM stats_topic
>>>>> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL
>>>>> '20'
>>>>> >             MINUTE)
>>>>> >             )
>>>>> >             WHERE rownum=1
>>>>> >
>>>>> >
>>>>> >           As there are a lot of CDC records for a single ID im using
>>>>> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
>>>>> >         sink_topic. The problem is that flink doesnt allow me to use
>>>>> it
>>>>> >         in combination with with the kafka connector:
>>>>> >
>>>>> >             pyflink.util.exceptions.TableException: Table sink
>>>>> >             'default_catalog.default_database.sink_table' doesn't
>>>>> >             support consuming update and delete changes which is
>>>>> >             produced by node Rank(strategy=[UndefinedStrategy],
>>>>> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1,
>>>>> rankEnd=1],
>>>>> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
>>>>> $f2])
>>>>> >
>>>>> >
>>>>> >         If I use the*upsert-kafka* connector everything is fine but
>>>>> then
>>>>> >         i receive empty JSON records in the sink topic:
>>>>> >
>>>>> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>>>>> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>>>>> >             {}
>>>>> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>>>>> >             {}
>>>>> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>>>> >
>>>>> >
>>>>> >         Thank you!
>>>>> >
>>>>>
>>>>>

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Posted by meneldor <me...@gmail.com>.
>
> Are you sure that the null records are not actually tombstone records? If
> you use upsert tables you usually want to have them + compaction. Or how
> else will you deal with deletions?

yes they are tombstone records, but i cannot avoid them because the
deduplication query cant produce an append-only connector on a LastRow.

What do you want to achieve? CDC records should be deduplicated by
> definition.
> I'm assuming that you want to aggregate the state to the current state. If
> so, how do you decide when the record is complete (e.g. no future updates)
> and can be written?
> I have the feeling that you are using CDC at a place where you don't want
> to use it, so maybe it helps to first explain your use case. Is stream
> processing a good fit for you in the first place?

Yes, I want to aggregate the state to the current state. The problem is
that the records are gonna be merged in a database by an ETL every hour. So
i don't need all the updates but only the last one, thats why im using a
window function and the future updates will be evaluated by the MERGE query
in the ETL too.

I've changed the query to instead use max(upd_ts) which is producing to
append only stream and it works but im not 100% sure if the result is the
same:

> INSERT INTO sink_table
> SELECT distinct id, account, upd_ts
> FROM stats_topic t, (
>    SELECT id, account, max(upd_ts) as maxTs,
>    FROM stats_topic
>    GROUP BY id, account, TUMBLE(row_ts, INTERVAL '20' MINUTE)
> ) s
> WHERE  t.id = s.id AND t.upd_ts = s.maxTs AND t.account = s.account


Thanks!

On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <ar...@apache.org> wrote:

> Hi,
>
> Are you sure that the null records are not actually tombstone records? If
> you use upsert tables you usually want to have them + compaction. Or how
> else will you deal with deletions?
>
> Is there anyone who is successfully deduplicating CDC records into either
>> kafka topic or S3 files(CSV/parquet) ?
>>
> What do you want to achieve? CDC records should be deduplicated by
> definition.
> I'm assuming that you want to aggregate the state to the current state. If
> so, how do you decide when the record is complete (e.g. no future updates)
> and can be written?
>
> I have the feeling that you are using CDC at a place where you don't want
> to use it, so maybe it helps to first explain your use case. Is stream
> processing a good fit for you in the first place?
>
> On Tue, Feb 9, 2021 at 10:37 AM meneldor <me...@gmail.com> wrote:
>
>> Unfortunately using row_ts doesn't help. Setting the kafka
>> topic cleanup.policy to compact is not a very good idea as it increases
>> cpu, memory and might lead to other problems.
>> So for now I'll just ignore the null records. Is there anyone who is
>> successfully deduplicating CDC records into either kafka topic or S3
>> files(CSV/parquet) ?
>>
>> Thanks!
>>
>> On Mon, Feb 8, 2021 at 7:13 PM meneldor <me...@gmail.com> wrote:
>>
>>> Thanks for the quick reply, Timo. Ill test with the  row_ts and
>>> compaction mode suggestions. However, ive read somewhere in the archives
>>> that the append only stream is only possible if i extract "the first"
>>> record from the ranking only which in my case is the oldest record.
>>>
>>> Regards
>>>
>>> On Mon, Feb 8, 2021, 18:56 Timo Walther <tw...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> could the problem be that you are mixing OVER and TUMBLE window with
>>>> each other? The TUMBLE is correctly defined over time attribute
>>>> `row_ts`
>>>> but the OVER window is defined using a regular column `upd_ts`. This
>>>> might be the case why the query is not append-only but updating.
>>>>
>>>> Maybe you can split the problem into sub queries and share the plan
>>>> with
>>>> us using .explain()?
>>>>
>>>> The nulls in upsert-kafka should be gone once you enable compaction
>>>> mode
>>>> in Kafka.
>>>>
>>>> I hope this helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 08.02.21 10:53, Khachatryan Roman wrote:
>>>> > Hi,
>>>> >
>>>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
>>>> > I'm pulling in Timo and Jark who might know better.
>>>> >
>>>> > https://issues.apache.org/jira/browse/FLINK-19857
>>>> > <https://issues.apache.org/jira/browse/FLINK-19857>
>>>> >
>>>> > Regards,
>>>> > Roman
>>>> >
>>>> >
>>>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <meneldor@gmail.com
>>>> > <ma...@gmail.com>> wrote:
>>>> >
>>>> >     Any help please? Is there a way to use the "Last row" from a
>>>> >     deduplication in an append-only stream or tell upsert-kafka to not
>>>> >     produce *null* records in the sink?
>>>> >
>>>> >     Thank you
>>>> >
>>>> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <meneldor@gmail.com
>>>> >     <ma...@gmail.com>> wrote:
>>>> >
>>>> >         Hello,
>>>> >         Flink 1.12.1(pyflink)
>>>> >         I am deduplicating CDC records coming from Maxwell in a kafka
>>>> >         topic.  Here is the SQL:
>>>> >
>>>> >             CREATE TABLE stats_topic(
>>>> >                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>>>> >             BIGINT>,
>>>> >                        `ts` BIGINT,
>>>> >                        `xid` BIGINT ,
>>>> >                        row_ts AS
>>>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>>> >                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>>>> >             '15' SECOND
>>>> >                      ) WITH (
>>>> >                        'connector' = 'kafka',
>>>> >                        'format' = 'json',
>>>> >                        'topic' = 'stats_topic',
>>>> >                        'properties.bootstrap.servers' =
>>>> 'localhost:9092',
>>>> >                        'properties.group.id
>>>> >             <http://properties.group.id>' = 'test_group'
>>>> >                      )
>>>> >
>>>> >             CREATE TABLE sink_table(
>>>> >                        `id` BIGINT,
>>>> >                        `account` INT,
>>>> >                        `upd_ts` BIGINT
>>>> >                      ) WITH (
>>>> >                        'connector' = 'kafka',
>>>> >                        'format' = 'json',
>>>> >                        'topic' = 'sink_topic',
>>>> >                        'properties.bootstrap.servers' =
>>>> 'localhost:9092',
>>>> >                        'properties.group.id
>>>> >             <http://properties.group.id>' = 'test_group'
>>>> >                      )
>>>> >
>>>> >
>>>> >             INSERT INTO sink_table
>>>> >             SELECT
>>>> >             id,
>>>> >             account,
>>>> >             upd_ts
>>>> >             FROM (
>>>> >             SELECT
>>>> >               id,
>>>> >               account,
>>>> >               upd_ts,
>>>> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>>>> >             AS rownum
>>>> >             FROM stats_topic
>>>> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>>>> >             MINUTE)
>>>> >             )
>>>> >             WHERE rownum=1
>>>> >
>>>> >
>>>> >           As there are a lot of CDC records for a single ID im using
>>>> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
>>>> >         sink_topic. The problem is that flink doesnt allow me to use
>>>> it
>>>> >         in combination with with the kafka connector:
>>>> >
>>>> >             pyflink.util.exceptions.TableException: Table sink
>>>> >             'default_catalog.default_database.sink_table' doesn't
>>>> >             support consuming update and delete changes which is
>>>> >             produced by node Rank(strategy=[UndefinedStrategy],
>>>> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>>>> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
>>>> $f2])
>>>> >
>>>> >
>>>> >         If I use the*upsert-kafka* connector everything is fine but
>>>> then
>>>> >         i receive empty JSON records in the sink topic:
>>>> >
>>>> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>>>> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>>>> >             {}
>>>> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>>>> >             {}
>>>> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>>> >
>>>> >
>>>> >         Thank you!
>>>> >
>>>>
>>>>

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Posted by Arvid Heise <ar...@apache.org>.
Hi,

Are you sure that the null records are not actually tombstone records? If
you use upsert tables you usually want to have them + compaction. Or how
else will you deal with deletions?

Is there anyone who is successfully deduplicating CDC records into either
> kafka topic or S3 files(CSV/parquet) ?
>
What do you want to achieve? CDC records should be deduplicated by
definition.
I'm assuming that you want to aggregate the state to the current state. If
so, how do you decide when the record is complete (e.g. no future updates)
and can be written?

I have the feeling that you are using CDC at a place where you don't want
to use it, so maybe it helps to first explain your use case. Is stream
processing a good fit for you in the first place?

On Tue, Feb 9, 2021 at 10:37 AM meneldor <me...@gmail.com> wrote:

> Unfortunately using row_ts doesn't help. Setting the kafka
> topic cleanup.policy to compact is not a very good idea as it increases
> cpu, memory and might lead to other problems.
> So for now I'll just ignore the null records. Is there anyone who is
> successfully deduplicating CDC records into either kafka topic or S3
> files(CSV/parquet) ?
>
> Thanks!
>
> On Mon, Feb 8, 2021 at 7:13 PM meneldor <me...@gmail.com> wrote:
>
>> Thanks for the quick reply, Timo. Ill test with the  row_ts and
>> compaction mode suggestions. However, ive read somewhere in the archives
>> that the append only stream is only possible if i extract "the first"
>> record from the ranking only which in my case is the oldest record.
>>
>> Regards
>>
>> On Mon, Feb 8, 2021, 18:56 Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> could the problem be that you are mixing OVER and TUMBLE window with
>>> each other? The TUMBLE is correctly defined over time attribute `row_ts`
>>> but the OVER window is defined using a regular column `upd_ts`. This
>>> might be the case why the query is not append-only but updating.
>>>
>>> Maybe you can split the problem into sub queries and share the plan with
>>> us using .explain()?
>>>
>>> The nulls in upsert-kafka should be gone once you enable compaction mode
>>> in Kafka.
>>>
>>> I hope this helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 08.02.21 10:53, Khachatryan Roman wrote:
>>> > Hi,
>>> >
>>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
>>> > I'm pulling in Timo and Jark who might know better.
>>> >
>>> > https://issues.apache.org/jira/browse/FLINK-19857
>>> > <https://issues.apache.org/jira/browse/FLINK-19857>
>>> >
>>> > Regards,
>>> > Roman
>>> >
>>> >
>>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <meneldor@gmail.com
>>> > <ma...@gmail.com>> wrote:
>>> >
>>> >     Any help please? Is there a way to use the "Last row" from a
>>> >     deduplication in an append-only stream or tell upsert-kafka to not
>>> >     produce *null* records in the sink?
>>> >
>>> >     Thank you
>>> >
>>> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <meneldor@gmail.com
>>> >     <ma...@gmail.com>> wrote:
>>> >
>>> >         Hello,
>>> >         Flink 1.12.1(pyflink)
>>> >         I am deduplicating CDC records coming from Maxwell in a kafka
>>> >         topic.  Here is the SQL:
>>> >
>>> >             CREATE TABLE stats_topic(
>>> >                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>>> >             BIGINT>,
>>> >                        `ts` BIGINT,
>>> >                        `xid` BIGINT ,
>>> >                        row_ts AS
>>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>> >                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>>> >             '15' SECOND
>>> >                      ) WITH (
>>> >                        'connector' = 'kafka',
>>> >                        'format' = 'json',
>>> >                        'topic' = 'stats_topic',
>>> >                        'properties.bootstrap.servers' =
>>> 'localhost:9092',
>>> >                        'properties.group.id
>>> >             <http://properties.group.id>' = 'test_group'
>>> >                      )
>>> >
>>> >             CREATE TABLE sink_table(
>>> >                        `id` BIGINT,
>>> >                        `account` INT,
>>> >                        `upd_ts` BIGINT
>>> >                      ) WITH (
>>> >                        'connector' = 'kafka',
>>> >                        'format' = 'json',
>>> >                        'topic' = 'sink_topic',
>>> >                        'properties.bootstrap.servers' =
>>> 'localhost:9092',
>>> >                        'properties.group.id
>>> >             <http://properties.group.id>' = 'test_group'
>>> >                      )
>>> >
>>> >
>>> >             INSERT INTO sink_table
>>> >             SELECT
>>> >             id,
>>> >             account,
>>> >             upd_ts
>>> >             FROM (
>>> >             SELECT
>>> >               id,
>>> >               account,
>>> >               upd_ts,
>>> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>>> >             AS rownum
>>> >             FROM stats_topic
>>> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>>> >             MINUTE)
>>> >             )
>>> >             WHERE rownum=1
>>> >
>>> >
>>> >           As there are a lot of CDC records for a single ID im using
>>> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
>>> >         sink_topic. The problem is that flink doesnt allow me to use it
>>> >         in combination with with the kafka connector:
>>> >
>>> >             pyflink.util.exceptions.TableException: Table sink
>>> >             'default_catalog.default_database.sink_table' doesn't
>>> >             support consuming update and delete changes which is
>>> >             produced by node Rank(strategy=[UndefinedStrategy],
>>> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>>> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
>>> $f2])
>>> >
>>> >
>>> >         If I use the*upsert-kafka* connector everything is fine but
>>> then
>>> >         i receive empty JSON records in the sink topic:
>>> >
>>> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>>> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>>> >             {}
>>> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>>> >             {}
>>> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>> >
>>> >
>>> >         Thank you!
>>> >
>>>
>>>

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Posted by meneldor <me...@gmail.com>.
Unfortunately using row_ts doesn't help. Setting the kafka
topic cleanup.policy to compact is not a very good idea as it increases
cpu, memory and might lead to other problems.
So for now I'll just ignore the null records. Is there anyone who is
successfully deduplicating CDC records into either kafka topic or S3
files(CSV/parquet) ?

Thanks!

On Mon, Feb 8, 2021 at 7:13 PM meneldor <me...@gmail.com> wrote:

> Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction
> mode suggestions. However, ive read somewhere in the archives that the
> append only stream is only possible if i extract "the first" record from
> the ranking only which in my case is the oldest record.
>
> Regards
>
> On Mon, Feb 8, 2021, 18:56 Timo Walther <tw...@apache.org> wrote:
>
>> Hi,
>>
>> could the problem be that you are mixing OVER and TUMBLE window with
>> each other? The TUMBLE is correctly defined over time attribute `row_ts`
>> but the OVER window is defined using a regular column `upd_ts`. This
>> might be the case why the query is not append-only but updating.
>>
>> Maybe you can split the problem into sub queries and share the plan with
>> us using .explain()?
>>
>> The nulls in upsert-kafka should be gone once you enable compaction mode
>> in Kafka.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 08.02.21 10:53, Khachatryan Roman wrote:
>> > Hi,
>> >
>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
>> > I'm pulling in Timo and Jark who might know better.
>> >
>> > https://issues.apache.org/jira/browse/FLINK-19857
>> > <https://issues.apache.org/jira/browse/FLINK-19857>
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <meneldor@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> >     Any help please? Is there a way to use the "Last row" from a
>> >     deduplication in an append-only stream or tell upsert-kafka to not
>> >     produce *null* records in the sink?
>> >
>> >     Thank you
>> >
>> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <meneldor@gmail.com
>> >     <ma...@gmail.com>> wrote:
>> >
>> >         Hello,
>> >         Flink 1.12.1(pyflink)
>> >         I am deduplicating CDC records coming from Maxwell in a kafka
>> >         topic.  Here is the SQL:
>> >
>> >             CREATE TABLE stats_topic(
>> >                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>> >             BIGINT>,
>> >                        `ts` BIGINT,
>> >                        `xid` BIGINT ,
>> >                        row_ts AS
>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>> >                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>> >             '15' SECOND
>> >                      ) WITH (
>> >                        'connector' = 'kafka',
>> >                        'format' = 'json',
>> >                        'topic' = 'stats_topic',
>> >                        'properties.bootstrap.servers' =
>> 'localhost:9092',
>> >                        'properties.group.id
>> >             <http://properties.group.id>' = 'test_group'
>> >                      )
>> >
>> >             CREATE TABLE sink_table(
>> >                        `id` BIGINT,
>> >                        `account` INT,
>> >                        `upd_ts` BIGINT
>> >                      ) WITH (
>> >                        'connector' = 'kafka',
>> >                        'format' = 'json',
>> >                        'topic' = 'sink_topic',
>> >                        'properties.bootstrap.servers' =
>> 'localhost:9092',
>> >                        'properties.group.id
>> >             <http://properties.group.id>' = 'test_group'
>> >                      )
>> >
>> >
>> >             INSERT INTO sink_table
>> >             SELECT
>> >             id,
>> >             account,
>> >             upd_ts
>> >             FROM (
>> >             SELECT
>> >               id,
>> >               account,
>> >               upd_ts,
>> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>> >             AS rownum
>> >             FROM stats_topic
>> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>> >             MINUTE)
>> >             )
>> >             WHERE rownum=1
>> >
>> >
>> >           As there are a lot of CDC records for a single ID im using
>> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
>> >         sink_topic. The problem is that flink doesnt allow me to use it
>> >         in combination with with the kafka connector:
>> >
>> >             pyflink.util.exceptions.TableException: Table sink
>> >             'default_catalog.default_database.sink_table' doesn't
>> >             support consuming update and delete changes which is
>> >             produced by node Rank(strategy=[UndefinedStrategy],
>> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
>> $f2])
>> >
>> >
>> >         If I use the*upsert-kafka* connector everything is fine but then
>> >         i receive empty JSON records in the sink topic:
>> >
>> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>> >             {}
>> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>> >             {}
>> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>> >
>> >
>> >         Thank you!
>> >
>>
>>

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Posted by meneldor <me...@gmail.com>.
Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction
mode suggestions. However, ive read somewhere in the archives that the
append only stream is only possible if i extract "the first" record from
the ranking only which in my case is the oldest record.

Regards

On Mon, Feb 8, 2021, 18:56 Timo Walther <tw...@apache.org> wrote:

> Hi,
>
> could the problem be that you are mixing OVER and TUMBLE window with
> each other? The TUMBLE is correctly defined over time attribute `row_ts`
> but the OVER window is defined using a regular column `upd_ts`. This
> might be the case why the query is not append-only but updating.
>
> Maybe you can split the problem into sub queries and share the plan with
> us using .explain()?
>
> The nulls in upsert-kafka should be gone once you enable compaction mode
> in Kafka.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 08.02.21 10:53, Khachatryan Roman wrote:
> > Hi,
> >
> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> > I'm pulling in Timo and Jark who might know better.
> >
> > https://issues.apache.org/jira/browse/FLINK-19857
> > <https://issues.apache.org/jira/browse/FLINK-19857>
> >
> > Regards,
> > Roman
> >
> >
> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <meneldor@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Any help please? Is there a way to use the "Last row" from a
> >     deduplication in an append-only stream or tell upsert-kafka to not
> >     produce *null* records in the sink?
> >
> >     Thank you
> >
> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <meneldor@gmail.com
> >     <ma...@gmail.com>> wrote:
> >
> >         Hello,
> >         Flink 1.12.1(pyflink)
> >         I am deduplicating CDC records coming from Maxwell in a kafka
> >         topic.  Here is the SQL:
> >
> >             CREATE TABLE stats_topic(
> >                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
> >             BIGINT>,
> >                        `ts` BIGINT,
> >                        `xid` BIGINT ,
> >                        row_ts AS
> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
> >                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
> >             '15' SECOND
> >                      ) WITH (
> >                        'connector' = 'kafka',
> >                        'format' = 'json',
> >                        'topic' = 'stats_topic',
> >                        'properties.bootstrap.servers' = 'localhost:9092',
> >                        'properties.group.id
> >             <http://properties.group.id>' = 'test_group'
> >                      )
> >
> >             CREATE TABLE sink_table(
> >                        `id` BIGINT,
> >                        `account` INT,
> >                        `upd_ts` BIGINT
> >                      ) WITH (
> >                        'connector' = 'kafka',
> >                        'format' = 'json',
> >                        'topic' = 'sink_topic',
> >                        'properties.bootstrap.servers' = 'localhost:9092',
> >                        'properties.group.id
> >             <http://properties.group.id>' = 'test_group'
> >                      )
> >
> >
> >             INSERT INTO sink_table
> >             SELECT
> >             id,
> >             account,
> >             upd_ts
> >             FROM (
> >             SELECT
> >               id,
> >               account,
> >               upd_ts,
> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
> >             AS rownum
> >             FROM stats_topic
> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
> >             MINUTE)
> >             )
> >             WHERE rownum=1
> >
> >
> >           As there are a lot of CDC records for a single ID im using
> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
> >         sink_topic. The problem is that flink doesnt allow me to use it
> >         in combination with with the kafka connector:
> >
> >             pyflink.util.exceptions.TableException: Table sink
> >             'default_catalog.default_database.sink_table' doesn't
> >             support consuming update and delete changes which is
> >             produced by node Rank(strategy=[UndefinedStrategy],
> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
> $f2])
> >
> >
> >         If I use the*upsert-kafka* connector everything is fine but then
> >         i receive empty JSON records in the sink topic:
> >
> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
> >             {}
> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
> >             {}
> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
> >
> >
> >         Thank you!
> >
>
>

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Posted by Timo Walther <tw...@apache.org>.
Hi,

could the problem be that you are mixing OVER and TUMBLE window with 
each other? The TUMBLE is correctly defined over time attribute `row_ts` 
but the OVER window is defined using a regular column `upd_ts`. This 
might be the case why the query is not append-only but updating.

Maybe you can split the problem into sub queries and share the plan with 
us using .explain()?

The nulls in upsert-kafka should be gone once you enable compaction mode 
in Kafka.

I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:
> Hi,
> 
> AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> I'm pulling in Timo and Jark who might know better.
> 
> https://issues.apache.org/jira/browse/FLINK-19857 
> <https://issues.apache.org/jira/browse/FLINK-19857>
> 
> Regards,
> Roman
> 
> 
> On Mon, Feb 8, 2021 at 9:14 AM meneldor <meneldor@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Any help please? Is there a way to use the "Last row" from a
>     deduplication in an append-only stream or tell upsert-kafka to not
>     produce *null* records in the sink?
> 
>     Thank you
> 
>     On Thu, Feb 4, 2021 at 1:22 PM meneldor <meneldor@gmail.com
>     <ma...@gmail.com>> wrote:
> 
>         Hello,
>         Flink 1.12.1(pyflink)
>         I am deduplicating CDC records coming from Maxwell in a kafka
>         topic.  Here is the SQL:
> 
>             CREATE TABLE stats_topic(
>                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>             BIGINT>,
>                        `ts` BIGINT,
>                        `xid` BIGINT ,
>                        row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>             '15' SECOND
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'stats_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
> 
>             CREATE TABLE sink_table(
>                        `id` BIGINT,
>                        `account` INT,
>                        `upd_ts` BIGINT
>                      ) WITH (
>                        'connector' = 'kafka',
>                        'format' = 'json',
>                        'topic' = 'sink_topic',
>                        'properties.bootstrap.servers' = 'localhost:9092',
>                        'properties.group.id
>             <http://properties.group.id>' = 'test_group'
>                      )
> 
> 
>             INSERT INTO sink_table
>             SELECT
>             id,
>             account,
>             upd_ts
>             FROM (
>             SELECT
>               id,
>               account,
>               upd_ts,
>               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>             AS rownum
>             FROM stats_topic
>             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>             MINUTE)
>             )
>             WHERE rownum=1
> 
> 
>           As there are a lot of CDC records for a single ID im using
>         ROW_NUMBER() and produce them on a 20 minutes interval to the
>         sink_topic. The problem is that flink doesnt allow me to use it
>         in combination with with the kafka connector:
> 
>             pyflink.util.exceptions.TableException: Table sink
>             'default_catalog.default_database.sink_table' doesn't
>             support consuming update and delete changes which is
>             produced by node Rank(strategy=[UndefinedStrategy],
>             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])
> 
> 
>         If I use the*upsert-kafka* connector everything is fine but then
>         i receive empty JSON records in the sink topic:
> 
>             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>             {}
>             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>             {}
>             {"id": 444444, "account": 4, "upd_ts": 1612334956}
> 
> 
>         Thank you!
> 


Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi,

AFAIK this should be supported in 1.12 via FLINK-19568 [1]
I'm pulling in Timo and Jark who might know better.

https://issues.apache.org/jira/browse/FLINK-19857

Regards,
Roman


On Mon, Feb 8, 2021 at 9:14 AM meneldor <me...@gmail.com> wrote:

> Any help please? Is there a way to use the "Last row" from a deduplication
> in an append-only stream or tell upsert-kafka to not produce *null*
> records in the sink?
>
> Thank you
>
> On Thu, Feb 4, 2021 at 1:22 PM meneldor <me...@gmail.com> wrote:
>
>> Hello,
>> Flink 1.12.1(pyflink)
>> I am deduplicating CDC records coming from Maxwell in a kafka topic.
>> Here is the SQL:
>>
>> CREATE TABLE stats_topic(
>>>           `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>>>           `ts` BIGINT,
>>>           `xid` BIGINT ,
>>>           row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>>           WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>>>         ) WITH (
>>>           'connector' = 'kafka',
>>>           'format' = 'json',
>>>           'topic' = 'stats_topic',
>>>           'properties.bootstrap.servers' = 'localhost:9092',
>>>           'properties.group.id' = 'test_group'
>>>         )
>>>
>>> CREATE TABLE sink_table(
>>>           `id` BIGINT,
>>>           `account` INT,
>>>           `upd_ts` BIGINT
>>>         ) WITH (
>>>           'connector' = 'kafka',
>>>           'format' = 'json',
>>>           'topic' = 'sink_topic',
>>>           'properties.bootstrap.servers' = 'localhost:9092',
>>>           'properties.group.id' = 'test_group'
>>>         )
>>>
>>>
>>> INSERT INTO sink_table
>>> SELECT
>>> id,
>>> account,
>>> upd_ts
>>> FROM (
>>> SELECT
>>>  id,
>>>  account,
>>>  upd_ts,
>>>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
>>> FROM stats_topic
>>> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>>> )
>>> WHERE rownum=1
>>>
>>
>>  As there are a lot of CDC records for a single ID im using ROW_NUMBER()
>> and produce them on a 20 minutes interval to the sink_topic. The problem is
>> that flink doesnt allow me to use it in combination with with the kafka
>> connector:
>>
>>> pyflink.util.exceptions.TableException: Table sink
>>> 'default_catalog.default_database.sink_table' doesn't support consuming
>>> update and delete changes which is produced by node
>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
>>> select=[$f0, $f1, $f2])
>>>
>>
>> If I use the* upsert-kafka* connector everything is fine but then i
>> receive empty JSON records in the sink topic:
>>
>>> {"id": 111111, "account": 4, "upd_ts": 1612334952}
>>> {"id": 222222, "account": 4, "upd_ts": 1612334953}
>>> {}
>>> {"id": 333333, "account": 4, "upd_ts": 1612334955}
>>> {}
>>> {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>>
>>
>> Thank you!
>>
>

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

Posted by meneldor <me...@gmail.com>.
Any help please? Is there a way to use the "Last row" from a deduplication
in an append-only stream or tell upsert-kafka to not produce *null* records
in the sink?

Thank you

On Thu, Feb 4, 2021 at 1:22 PM meneldor <me...@gmail.com> wrote:

> Hello,
> Flink 1.12.1(pyflink)
> I am deduplicating CDC records coming from Maxwell in a kafka topic.  Here
> is the SQL:
>
> CREATE TABLE stats_topic(
>>           `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>>           `ts` BIGINT,
>>           `xid` BIGINT ,
>>           row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>           WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>>         ) WITH (
>>           'connector' = 'kafka',
>>           'format' = 'json',
>>           'topic' = 'stats_topic',
>>           'properties.bootstrap.servers' = 'localhost:9092',
>>           'properties.group.id' = 'test_group'
>>         )
>>
>> CREATE TABLE sink_table(
>>           `id` BIGINT,
>>           `account` INT,
>>           `upd_ts` BIGINT
>>         ) WITH (
>>           'connector' = 'kafka',
>>           'format' = 'json',
>>           'topic' = 'sink_topic',
>>           'properties.bootstrap.servers' = 'localhost:9092',
>>           'properties.group.id' = 'test_group'
>>         )
>>
>>
>> INSERT INTO sink_table
>> SELECT
>> id,
>> account,
>> upd_ts
>> FROM (
>> SELECT
>>  id,
>>  account,
>>  upd_ts,
>>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
>> FROM stats_topic
>> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>> )
>> WHERE rownum=1
>>
>
>  As there are a lot of CDC records for a single ID im using ROW_NUMBER()
> and produce them on a 20 minutes interval to the sink_topic. The problem is
> that flink doesnt allow me to use it in combination with with the kafka
> connector:
>
>> pyflink.util.exceptions.TableException: Table sink
>> 'default_catalog.default_database.sink_table' doesn't support consuming
>> update and delete changes which is produced by node
>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
>> select=[$f0, $f1, $f2])
>>
>
> If I use the* upsert-kafka* connector everything is fine but then i
> receive empty JSON records in the sink topic:
>
>> {"id": 111111, "account": 4, "upd_ts": 1612334952}
>> {"id": 222222, "account": 4, "upd_ts": 1612334953}
>> {}
>> {"id": 333333, "account": 4, "upd_ts": 1612334955}
>> {}
>> {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>
>
> Thank you!
>