You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2022/06/30 07:02:55 UTC
flink sql解析kafka数据
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true' 但是我在客户端执行的时候
发现识别不到这个字段
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(
trans_number STRING,
end_timestamp STRING,
return_flag STRING,
commodity_type STRING
)
COMMENT '中台交易流水小票头' WITH (
'connector' = 'kafka',
'topic' = 'yh_rme_soc_stream_prod-tlog_header',
'properties.bootstrap.servers' = '****',
'properties.group.id' = 'ccc_test_20220630_2',
'properties.request.timeout.ms' = '60000',
'format' = 'json',
'scan.startup.mode' = 'group-offsets',
-- 'scan.startup.mode' = 'timestamp',
-- 'scan.startup.timestamp-millis' = '1653739200000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);
| |
小昌
|
|
ccc0606fighting@163.com
|
Re:flink sql解析kafka数据
Posted by RS <ti...@163.com>.
Hi,
你可以在定义表ccc_test_20220630_2字段的时候,结构如果固定,可以指定字段类型为ARRAY+ROW吧,例如 abc ARRAY<ROW<key1 STRING, key2 STRING>>,如果里面是动态结构,可以定义为MAP
结构如果比较复杂,或者字段不明确,就自定义UDF解决。
Thanks
在 2022-06-30 15:02:55,"小昌同学" <cc...@163.com> 写道:
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true' 但是我在客户端执行的时候
发现识别不到这个字段
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(
trans_number STRING,
end_timestamp STRING,
return_flag STRING,
commodity_type STRING
)
COMMENT '中台交易流水小票头' WITH (
'connector' = 'kafka',
'topic' = 'yh_rme_soc_stream_prod-tlog_header',
'properties.bootstrap.servers' = '****',
'properties.group.id' = 'ccc_test_20220630_2',
'properties.request.timeout.ms' = '60000',
'format' = 'json',
'scan.startup.mode' = 'group-offsets',
-- 'scan.startup.mode' = 'timestamp',
-- 'scan.startup.timestamp-millis' = '1653739200000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);
| |
小昌
|
|
ccc0606fighting@163.com
|
Re:flink sql解析kafka数据
Posted by Xuyang <xy...@163.com>.
Hi, 目前我在flink master上没找到这个参数'json.infer-schema.flatten-nested-columns.enable'='true'。
你可以试一下在source读完整数据,然后通过UDF手动展开潜逃类型。
在 2022-06-30 15:02:55,"小昌同学" <cc...@163.com> 写道:
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true' 但是我在客户端执行的时候
发现识别不到这个字段
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(
trans_number STRING,
end_timestamp STRING,
return_flag STRING,
commodity_type STRING
)
COMMENT '中台交易流水小票头' WITH (
'connector' = 'kafka',
'topic' = 'yh_rme_soc_stream_prod-tlog_header',
'properties.bootstrap.servers' = '****',
'properties.group.id' = 'ccc_test_20220630_2',
'properties.request.timeout.ms' = '60000',
'format' = 'json',
'scan.startup.mode' = 'group-offsets',
-- 'scan.startup.mode' = 'timestamp',
-- 'scan.startup.timestamp-millis' = '1653739200000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);
| |
小昌
|
|
ccc0606fighting@163.com
|
Re: flink sql解析kafka数据
Posted by 林影 <br...@gmail.com>.
Hi, 'json.infer-schema.flatten-nested-columns.enable'='true'
这个参数不是属于社区Flink 的feature,是阿里云的vvr flink引擎才支持的参数。
JasonLee <17...@163.com> 于2022年7月5日周二 11:33写道:
> Hi
> 解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
>
> Best
> JasonLee
>
>
> ---- 回复的原邮件 ----
> | 发件人 | 小昌同学<cc...@163.com> |
> | 发送日期 | 2022年06月30日 15:02 |
> | 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
> | 主题 | flink sql解析kafka数据 |
> 各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink
> sql建表语句拿到最里面的字段的值
> 我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'
> 但是我在客户端执行的时候
> 发现识别不到这个字段
> 有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
> CREATE TABLE ccc_test_20220630_2
> (
> trans_number STRING,
> end_timestamp STRING,
> return_flag STRING,
> commodity_type STRING
>
>
> )
> COMMENT '中台交易流水小票头' WITH (
> 'connector' = 'kafka',
> 'topic' = 'yh_rme_soc_stream_prod-tlog_header',
> 'properties.bootstrap.servers' = '****',
> 'properties.group.id' = 'ccc_test_20220630_2',
> 'properties.request.timeout.ms' = '60000',
> 'format' = 'json',
> 'scan.startup.mode' = 'group-offsets',
> -- 'scan.startup.mode' = 'timestamp',
> -- 'scan.startup.timestamp-millis' = '1653739200000',
> 'json.fail-on-missing-field' = 'false',
> 'json.ignore-parse-errors' = 'true'
> 'json.infer-schema.flatten-nested-columns.enable'='true'
> );
>
>
> | |
> 小昌
> |
> |
> ccc0606fighting@163.com
> |
回复:flink sql解析kafka数据
Posted by JasonLee <17...@163.com>.
Hi
解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
Best
JasonLee
---- 回复的原邮件 ----
| 发件人 | 小昌同学<cc...@163.com> |
| 发送日期 | 2022年06月30日 15:02 |
| 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
| 主题 | flink sql解析kafka数据 |
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true' 但是我在客户端执行的时候
发现识别不到这个字段
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(
trans_number STRING,
end_timestamp STRING,
return_flag STRING,
commodity_type STRING
)
COMMENT '中台交易流水小票头' WITH (
'connector' = 'kafka',
'topic' = 'yh_rme_soc_stream_prod-tlog_header',
'properties.bootstrap.servers' = '****',
'properties.group.id' = 'ccc_test_20220630_2',
'properties.request.timeout.ms' = '60000',
'format' = 'json',
'scan.startup.mode' = 'group-offsets',
-- 'scan.startup.mode' = 'timestamp',
-- 'scan.startup.timestamp-millis' = '1653739200000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);
| |
小昌
|
|
ccc0606fighting@163.com
|