You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Alexander Sorokoumov <as...@confluent.io.INVALID> on 2022/12/17 01:28:02 UTC

Streaming queries in FTS using Kafka log

Hello community,

I want to ask about streaming queries with Flink Table Store. After reading
the documentation on Streaming Queries [1], I was under the impression that
only tables with LogStore-over-TableStore and No Changelog Producer need
the normalization step since the Kafka log has the `before` values.

However, when I created the following table:

CREATE TABLE word_count (
     word STRING PRIMARY KEY NOT ENFORCED,
     cnt BIGINT
) WITH (
     'connector' = 'table-store',
     'path' = 's3://my-bucket/table-store',
     'log.system' = 'kafka',
     'kafka.bootstrap.servers' = 'broker:9092',
     'kafka.topic' = 'word_count_log',
     'auto-create' = 'true',
     'log.changelog-mode' = 'all',
     'log.consistency' = 'eventual'
);

And ran a streaming query against it:

SELECT * FROM word_count;

The topology for this query had the normalization task (ChangelogNormalize).

Is this a bug or expected behavior? If it is the latter, can you please
clarify why this is the case?

1.
https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/streaming-query/

Thank you,
Alex

Re: Streaming queries in FTS using Kafka log

Posted by Alexander Sorokoumov <as...@confluent.io.INVALID>.
Hello everyone,

Answering my own question, it turns out that Flink Table Store removes the
normalization node on read from an external log system only if
log.changelog-mode='all' and log.consistency = 'transactional' [1].

1.
https://github.com/apache/flink-table-store/blob/7e0d55ff3dc9fd48455b17d9a439647b0554d020/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java#L136-L141

Best,
Alex


On Fri, Dec 16, 2022 at 5:28 PM Alexander Sorokoumov <
asorokoumov@confluent.io> wrote:

> Hello community,
>
> I want to ask about streaming queries with Flink Table Store. After
> reading the documentation on Streaming Queries [1], I was under the
> impression that only tables with LogStore-over-TableStore and No Changelog
> Producer need the normalization step since the Kafka log has the `before`
> values.
>
> However, when I created the following table:
>
> CREATE TABLE word_count (
>      word STRING PRIMARY KEY NOT ENFORCED,
>      cnt BIGINT
> ) WITH (
>      'connector' = 'table-store',
>      'path' = 's3://my-bucket/table-store',
>      'log.system' = 'kafka',
>      'kafka.bootstrap.servers' = 'broker:9092',
>      'kafka.topic' = 'word_count_log',
>      'auto-create' = 'true',
>      'log.changelog-mode' = 'all',
>      'log.consistency' = 'eventual'
> );
>
> And ran a streaming query against it:
>
> SELECT * FROM word_count;
>
> The topology for this query had the normalization task
> (ChangelogNormalize).
>
> Is this a bug or expected behavior? If it is the latter, can you please
> clarify why this is the case?
>
> 1.
> https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/streaming-query/
>
> Thank you,
> Alex
>