You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by JasonLee <17...@163.com> on 2022/07/05 03:33:22 UTC

回复:flink sql解析kafka数据

Hi
解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA


Best
JasonLee


---- 回复的原邮件 ----
| 发件人 | 小昌同学<cc...@163.com> |
| 发送日期 | 2022年06月30日 15:02 |
| 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
| 主题 | flink sql解析kafka数据 |
各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
               trans_number   STRING,
               end_timestamp  STRING,
               return_flag    STRING,
               commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '****',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '60000',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '1653739200000',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fighting@163.com
|

Re: flink sql解析kafka数据

Posted by 林影 <br...@gmail.com>.
Hi, 'json.infer-schema.flatten-nested-columns.enable'='true'
这个参数不是属于社区Flink 的feature,是阿里云的vvr flink引擎才支持的参数。

JasonLee <17...@163.com> 于2022年7月5日周二 11:33写道:

> Hi
> 解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
>
> Best
> JasonLee
>
>
> ---- 回复的原邮件 ----
> | 发件人 | 小昌同学<cc...@163.com> |
> | 发送日期 | 2022年06月30日 15:02 |
> | 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
> | 主题 | flink sql解析kafka数据 |
> 各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink
> sql建表语句拿到最里面的字段的值
> 我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'
> 但是我在客户端执行的时候
> 发现识别不到这个字段
> 有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
> CREATE TABLE ccc_test_20220630_2
> (
>                trans_number   STRING,
>                end_timestamp  STRING,
>                return_flag    STRING,
>                commodity_type STRING
>
>
> )
>    COMMENT '中台交易流水小票头' WITH (
>    'connector' = 'kafka',
>    'topic' = 'yh_rme_soc_stream_prod-tlog_header',
>    'properties.bootstrap.servers' = '****',
>    'properties.group.id' = 'ccc_test_20220630_2',
>    'properties.request.timeout.ms' = '60000',
>    'format' = 'json',
>    'scan.startup.mode' = 'group-offsets',
>    -- 'scan.startup.mode' = 'timestamp',
>    -- 'scan.startup.timestamp-millis' = '1653739200000',
>    'json.fail-on-missing-field' = 'false',
>    'json.ignore-parse-errors' = 'true'
> 'json.infer-schema.flatten-nested-columns.enable'='true'
> );
>
>
> | |
> 小昌
> |
> |
> ccc0606fighting@163.com
> |