You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by JasonLee <17...@163.com> on 2022/07/05 03:33:22 UTC
回复:flink sql解析kafka数据
Hi
解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
Best
JasonLee
---- 回复的原邮件 ----
| 发件人 | 小昌同学<cc...@163.com> |
| 发送日期 | 2022年06月30日 15:02 |
| 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
| 主题 | flink sql解析kafka数据 |
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true' 但是我在客户端执行的时候
发现识别不到这个字段
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(
trans_number STRING,
end_timestamp STRING,
return_flag STRING,
commodity_type STRING
)
COMMENT '中台交易流水小票头' WITH (
'connector' = 'kafka',
'topic' = 'yh_rme_soc_stream_prod-tlog_header',
'properties.bootstrap.servers' = '****',
'properties.group.id' = 'ccc_test_20220630_2',
'properties.request.timeout.ms' = '60000',
'format' = 'json',
'scan.startup.mode' = 'group-offsets',
-- 'scan.startup.mode' = 'timestamp',
-- 'scan.startup.timestamp-millis' = '1653739200000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);
| |
小昌
|
|
ccc0606fighting@163.com
|
Re: flink sql解析kafka数据
Posted by 林影 <br...@gmail.com>.
Hi, 'json.infer-schema.flatten-nested-columns.enable'='true'
这个参数不是属于社区Flink 的feature,是阿里云的vvr flink引擎才支持的参数。
JasonLee <17...@163.com> 于2022年7月5日周二 11:33写道:
> Hi
> 解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
>
> Best
> JasonLee
>
>
> ---- 回复的原邮件 ----
> | 发件人 | 小昌同学<cc...@163.com> |
> | 发送日期 | 2022年06月30日 15:02 |
> | 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
> | 主题 | flink sql解析kafka数据 |
> 各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink
> sql建表语句拿到最里面的字段的值
> 我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'
> 但是我在客户端执行的时候
> 发现识别不到这个字段
> 有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
> CREATE TABLE ccc_test_20220630_2
> (
> trans_number STRING,
> end_timestamp STRING,
> return_flag STRING,
> commodity_type STRING
>
>
> )
> COMMENT '中台交易流水小票头' WITH (
> 'connector' = 'kafka',
> 'topic' = 'yh_rme_soc_stream_prod-tlog_header',
> 'properties.bootstrap.servers' = '****',
> 'properties.group.id' = 'ccc_test_20220630_2',
> 'properties.request.timeout.ms' = '60000',
> 'format' = 'json',
> 'scan.startup.mode' = 'group-offsets',
> -- 'scan.startup.mode' = 'timestamp',
> -- 'scan.startup.timestamp-millis' = '1653739200000',
> 'json.fail-on-missing-field' = 'false',
> 'json.ignore-parse-errors' = 'true'
> 'json.infer-schema.flatten-nested-columns.enable'='true'
> );
>
>
> | |
> 小昌
> |
> |
> ccc0606fighting@163.com
> |