You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 大罗 <ti...@163.com> on 2020/09/08 02:38:45 UTC
how flink-sql connector kafka reads array json
hi,大家好,我遇到一个问题。
下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
"age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?
CREATE TABLE mykafka1 (name String, age Int)
WITH (
'connector.type' = 'kafka',
'format.type' = 'json',
'update-mode' = 'append'
);
还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream<List<data>>,再转换flatMap转换成DataStream<data>,再使用tableEnv.fromDataStream把它变成tableSource?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: how flink-sql connector kafka reads array json
Posted by Benchao Li <li...@apache.org>.
Hi,
这个是一个已知的问题,已经有issue[1] 在跟进解决了。预计在1.12可以使用。
[1] https://issues.apache.org/jira/browse/FLINK-18590
大罗 <ti...@163.com> 于2020年9月8日周二 上午10:39写道:
> hi,大家好,我遇到一个问题。
>
> 下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
> "age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?
>
> CREATE TABLE mykafka1 (name String, age Int)
> WITH (
> 'connector.type' = 'kafka',
> 'format.type' = 'json',
> 'update-mode' = 'append'
> );
>
>
> 还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream<List<data>>,再转换flatMap转换成DataStream<data>,再使用tableEnv.fromDataStream把它变成tableSource?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li