You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Benchao Li <li...@apache.org> on 2020/07/17 12:18:14 UTC

Re: flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka

DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据

wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> 于2020年7月17日周五 下午1:02写道:

>
>  INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*)
> from kafka_ods_artemis_out_order group by warehouse_id;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Table sink
> 'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming
> update changes which is produced by node
> GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS
> EXPR$1])
>
> 在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
>
> 我看现在 Flink-1.11 中是用了  KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让
> GroupBy 的结果也发送到 Kafka 呢?
>
> 谢谢,
> 王磊
>
>
> wanglei2@geekplus.com.cn
>
>

-- 

Best,
Benchao Li

Re: Re: flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
谢谢,我直接更改了 KafkaDynamicSinkBase 的 getChangelogMode 方法, 是可以实现目的的。

更改前:
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
    return this.encodingFormat.getChangelogMode();
}
更改后:public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
      return ChangelogMode.newBuilder()
         .addContainedKind(RowKind.INSERT)
         .addContainedKind(RowKind.UPDATE_AFTER)
         .build();
   }而且这样更改以后 UPDATE_BEFORE 的记录被过滤掉了,没有被发送到 Kafka
谢谢,王磊


wanglei2@geekplus.com.cn 

 
Sender: Benchao Li
Send Time: 2020-07-17 20:18
Receiver: user-zh
Subject: Re: flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka
DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据
 
wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> 于2020年7月17日周五 下午1:02写道:
 
>
>  INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*)
> from kafka_ods_artemis_out_order group by warehouse_id;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Table sink
> 'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming
> update changes which is produced by node
> GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS
> EXPR$1])
>
> 在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
>
> 我看现在 Flink-1.11 中是用了  KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让
> GroupBy 的结果也发送到 Kafka 呢?
>
> 谢谢,
> 王磊
>
>
> wanglei2@geekplus.com.cn
>
>
 
-- 
 
Best,
Benchao Li