You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by op <52...@qq.com> on 2020/07/30 02:40:55 UTC

回复: Sql往kafka表写聚合数据报错

嗯 谢谢,那能不能像1.10那样自定义connector type


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <imjark@gmail.com&gt;;
发送时间:&nbsp;2020年7月30日(星期四) 上午10:39
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: Sql往kafka表写聚合数据报错



抱歉哈,1.11 提供的 Canal 和 Debezium 还不支持 sink 。 预计会在 1.12 中提供。

On Wed, 29 Jul 2020 at 12:51, Benchao Li <libenchao@apache.org&gt; wrote:

&gt; 你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。
&gt;
&gt; op <520075694@qq.com&gt; 于2020年7月29日周三 上午11:59写道:
&gt;
&gt; &gt; 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??
&gt; &gt;
&gt; &gt;
&gt; &gt; 谢谢
&gt; &gt;
&gt; &gt;
&gt; &gt; Exception in thread "main" org.apache.flink.table.api.TableException:
&gt; &gt; Table sink 'default_catalog.default_database.mvp_rtdwb_user_business'
&gt; &gt; doesn't support consuming update changes which is produced by node
&gt; &gt; GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS
&gt; &gt; text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS
&gt; &gt; be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS
&gt; &gt; share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS
&gt; follow_count,
&gt; &gt; SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11)
&gt; &gt; AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS
&gt; event_time])
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Best,
&gt; Benchao Li
&gt;