You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "799590989@qq.com.INVALID" <79...@qq.com.INVALID> on 2022/04/06 06:36:40 UTC

flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?

问题:flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?

flink:1.13.6
hive:1.1.1
hadoop:2.6.0-cdh5.16.2

纯SQL的方式,使用kafka作为source,中间的转换会有DISTINCT 或者 GROUP 操作,将计算结果sink到hive表,会报下面的错误

doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[id, user_id, status, EXPR$3]

在网上找了答案,说需要将sink表设置为upsert模式,尝试过按照下列方式创建sink表,创建表能成功,但提交INSERT INTO时还是报错

source表
 
CREATE TABLE data_2432_5074_model(
    id STRING,
    user_id STRING,status STRING
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'person',
    'properties.bootstrap.servers' = '192.168.9.116:9092',
    'properties.group.id' = 'chinaoly-group',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'json.fail-on-missing-field'='false',
    'json.ignore-parse-errors'='true'
)
sink表

CREATE TABLE output_2432_5076_model_1649226175146(
    id STRING,
    user_id STRING,
    status STRING,
    my_dt timestamp
) TBLPROPERTIES (
    'streaming-source.enable' = 'true',
    'streaming-source.partition.include' = 'all',
    'streaming-source.partition-order' = 'create-time',
    'sink.partition-commit.watermark-time-zone' = 'Asia/Shanghai',
    'sink.partition-commit.policy.kind' = 'metastore,success-file',
    'write.upsert.enable' = 'true',
    'streaming-source.monitor-interval' = '1 min'
)

计算逻辑
INSERT INTO output_2432_5076_model_1649226175146 SELECT DISTINCT id AS id, user_id AS user_id, status AS status ,proctime() FROM (SELECT * FROM data_2432_5074_model) WHERE status = '1'

万能的官方,能否给我答案,先谢谢了。



799590989@qq.com

Re: flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?

Posted by LuNing Wang <wa...@gmail.com>.
Hi,

Hive sink不支持 upsert写入,只能INSERT写入,你怎么设置都不行,一般这种情况可以使用hudi和iceberg作为Sink接受
upsert数据。

Best,
LuNing Wang

799590989@qq.com.INVALID <79...@qq.com.invalid> 于2022年4月6日周三 14:37写道:

> 问题:flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?
>
> flink:1.13.6
> hive:1.1.1
> hadoop:2.6.0-cdh5.16.2
>
> 纯SQL的方式,使用kafka作为source,中间的转换会有DISTINCT 或者 GROUP 操作,将计算结果sink到hive表,会报下面的错误
>
> doesn't support consuming update changes which is produced by node
> GroupAggregate(groupBy=[id, user_id, status, EXPR$3]
>
> 在网上找了答案,说需要将sink表设置为upsert模式,尝试过按照下列方式创建sink表,创建表能成功,但提交INSERT INTO时还是报错
>
> source表
>
> CREATE TABLE data_2432_5074_model(
>     id STRING,
>     user_id STRING,status STRING
>     ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'person',
>     'properties.bootstrap.servers' = '192.168.9.116:9092',
>     'properties.group.id' = 'chinaoly-group',
>     'scan.startup.mode' = 'latest-offset',
>     'format' = 'json',
>     'json.fail-on-missing-field'='false',
>     'json.ignore-parse-errors'='true'
> )
> sink表
>
> CREATE TABLE output_2432_5076_model_1649226175146(
>     id STRING,
>     user_id STRING,
>     status STRING,
>     my_dt timestamp
> ) TBLPROPERTIES (
>     'streaming-source.enable' = 'true',
>     'streaming-source.partition.include' = 'all',
>     'streaming-source.partition-order' = 'create-time',
>     'sink.partition-commit.watermark-time-zone' = 'Asia/Shanghai',
>     'sink.partition-commit.policy.kind' = 'metastore,success-file',
>     'write.upsert.enable' = 'true',
>     'streaming-source.monitor-interval' = '1 min'
> )
>
> 计算逻辑
> INSERT INTO output_2432_5076_model_1649226175146 SELECT DISTINCT id AS id,
> user_id AS user_id, status AS status ,proctime() FROM (SELECT * FROM
> data_2432_5074_model) WHERE status = '1'
>
> 万能的官方,能否给我答案,先谢谢了。
>
>
>
> 799590989@qq.com
>