You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by chi ma <ma...@gmail.com> on 2018/04/09 06:43:39 UTC

How to output a Table to Kafka?

Hi,

I'm a newbie to Flink. I'm trying to load data from HDFS and analyze it
using Flink Table APIs.

I created a TableSource, and registered it into the StreamTableEnvironment
as a table, and ran a SQL on that table through
streamTableEnvironment.sqlQuery, and finally I called writeToSink to write
the result to a Kafka010JsonTableSink.

But, I got an exception, "Exception in thread "main"
org.apache.flink.table.api.TableException: AppendStreamTableSink requires
that Table has only insert changes."

I've googled it. The probable cause is that I put a "group by" clause in
the SQL statement, which makes the result table to be a retract table.

And, if I pass a SQL statement without "group by" clause to sqlQuery,
everything gets ok, and I can see the result in the Kafka topic.

So, my questions are:
   1. What is the classical usage for outputing a retract table?
   2. Is there a unified way to process output for both append-only and not
append-only table?

Thanks.
Best Regards~

Re: How to output a Table to Kafka?

Posted by Hequn Cheng <ch...@gmail.com>.
Hi chi ma,

A query with "group by" produces an updated table. Both
UpsertStreamTableSink and RetractStreamTableSink can be used to emit an
updated table. The main difference between UpsertStreamTableSink and
RetractStreamTableSink is RetractStreamTableSink will encode an UPDATE
change as a retract message for the updated (previous) row and an add
message for the updating (new) row, while for UpsertStreamTableSink UPDATE
changes are encoded with a single message and hence more efficient. More
details can be found here[1][2].

Both UpsertStreamTableSink and RetractStreamTableSink can be used to
emit an append-only table or an updated table. While AppendStreamTableSink
can only be used to emit an append-only table.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html

[2] https://flink.apache.org/news/2017/04/04/dynamic-tables.html


On Mon, Apr 9, 2018 at 2:43 PM, chi ma <ma...@gmail.com> wrote:

> Hi,
>
> I'm a newbie to Flink. I'm trying to load data from HDFS and analyze it
> using Flink Table APIs.
>
> I created a TableSource, and registered it into the StreamTableEnvironment
> as a table, and ran a SQL on that table through streamTableEnvironment.sqlQuery,
> and finally I called writeToSink to write the result to a
> Kafka010JsonTableSink.
>
> But, I got an exception, "Exception in thread "main"
> org.apache.flink.table.api.TableException: AppendStreamTableSink requires
> that Table has only insert changes."
>
> I've googled it. The probable cause is that I put a "group by" clause in
> the SQL statement, which makes the result table to be a retract table.
>
> And, if I pass a SQL statement without "group by" clause to sqlQuery,
> everything gets ok, and I can see the result in the Kafka topic.
>
> So, my questions are:
>    1. What is the classical usage for outputing a retract table?
>    2. Is there a unified way to process output for both append-only and
> not append-only table?
>
> Thanks.
> Best Regards~
>