You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2022/06/30 07:02:55 UTC

flink sql解析kafka数据

各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
               trans_number   STRING,
               end_timestamp  STRING,
               return_flag    STRING,
               commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '****',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '60000',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '1653739200000',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fighting@163.com
|

Re:flink sql解析kafka数据

Posted by RS <ti...@163.com>.
Hi,


你可以在定义表ccc_test_20220630_2字段的时候,结构如果固定,可以指定字段类型为ARRAY+ROW吧,例如 abc ARRAY<ROW<key1 STRING, key2 STRING>>,如果里面是动态结构,可以定义为MAP
结构如果比较复杂,或者字段不明确,就自定义UDF解决。


Thanks




在 2022-06-30 15:02:55,"小昌同学" <cc...@163.com> 写道:

各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
               trans_number   STRING,
               end_timestamp  STRING,
               return_flag    STRING,
               commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '****',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '60000',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '1653739200000',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fighting@163.com
|

Re:flink sql解析kafka数据

Posted by Xuyang <xy...@163.com>.
Hi, 目前我在flink master上没找到这个参数'json.infer-schema.flatten-nested-columns.enable'='true'。

你可以试一下在source读完整数据,然后通过UDF手动展开潜逃类型。



在 2022-06-30 15:02:55,"小昌同学" <cc...@163.com> 写道:

各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
               trans_number   STRING,
               end_timestamp  STRING,
               return_flag    STRING,
               commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '****',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '60000',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '1653739200000',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fighting@163.com
|

Re: flink sql解析kafka数据

Posted by 林影 <br...@gmail.com>.
Hi, 'json.infer-schema.flatten-nested-columns.enable'='true'
这个参数不是属于社区Flink 的feature,是阿里云的vvr flink引擎才支持的参数。

JasonLee <17...@163.com> 于2022年7月5日周二 11:33写道:

> Hi
> 解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
>
> Best
> JasonLee
>
>
> ---- 回复的原邮件 ----
> | 发件人 | 小昌同学<cc...@163.com> |
> | 发送日期 | 2022年06月30日 15:02 |
> | 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
> | 主题 | flink sql解析kafka数据 |
> 各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink
> sql建表语句拿到最里面的字段的值
> 我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'
> 但是我在客户端执行的时候
> 发现识别不到这个字段
> 有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
> CREATE TABLE ccc_test_20220630_2
> (
>                trans_number   STRING,
>                end_timestamp  STRING,
>                return_flag    STRING,
>                commodity_type STRING
>
>
> )
>    COMMENT '中台交易流水小票头' WITH (
>    'connector' = 'kafka',
>    'topic' = 'yh_rme_soc_stream_prod-tlog_header',
>    'properties.bootstrap.servers' = '****',
>    'properties.group.id' = 'ccc_test_20220630_2',
>    'properties.request.timeout.ms' = '60000',
>    'format' = 'json',
>    'scan.startup.mode' = 'group-offsets',
>    -- 'scan.startup.mode' = 'timestamp',
>    -- 'scan.startup.timestamp-millis' = '1653739200000',
>    'json.fail-on-missing-field' = 'false',
>    'json.ignore-parse-errors' = 'true'
> 'json.infer-schema.flatten-nested-columns.enable'='true'
> );
>
>
> | |
> 小昌
> |
> |
> ccc0606fighting@163.com
> |

回复:flink sql解析kafka数据

Posted by JasonLee <17...@163.com>.
Hi
解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA


Best
JasonLee


---- 回复的原邮件 ----
| 发件人 | 小昌同学<cc...@163.com> |
| 发送日期 | 2022年06月30日 15:02 |
| 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
| 主题 | flink sql解析kafka数据 |
各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
               trans_number   STRING,
               end_timestamp  STRING,
               return_flag    STRING,
               commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '****',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '60000',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '1653739200000',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fighting@163.com
|