You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 大罗 <ti...@163.com> on 2020/09/08 02:38:45 UTC

how flink-sql connector kafka reads array json

hi,大家好,我遇到一个问题。

下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
"age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?

CREATE TABLE mykafka1 (name String, age Int) 
WITH (
   'connector.type' = 'kafka',
   'format.type' = 'json',
   'update-mode' = 'append'
);

还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream<List&lt;data>>,再转换flatMap转换成DataStream<data>,再使用tableEnv.fromDataStream把它变成tableSource?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: how flink-sql connector kafka reads array json

Posted by Benchao Li <li...@apache.org>.
Hi,

这个是一个已知的问题,已经有issue[1] 在跟进解决了。预计在1.12可以使用。

[1] https://issues.apache.org/jira/browse/FLINK-18590

大罗 <ti...@163.com> 于2020年9月8日周二 上午10:39写道:

> hi,大家好,我遇到一个问题。
>
> 下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
> "age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?
>
> CREATE TABLE mykafka1 (name String, age Int)
> WITH (
>    'connector.type' = 'kafka',
>    'format.type' = 'json',
>    'update-mode' = 'append'
> );
>
>
> 还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream<List&lt;data>>,再转换flatMap转换成DataStream<data>,再使用tableEnv.fromDataStream把它变成tableSource?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li