You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by hua mulan <de...@outlook.com> on 2020/07/14 02:34:26 UTC
Flink SQL处理Array型的JSON
Hi
Kafka中的JSON结构是个Array例子如下。
[
{ "id": 1},
{ "id": 2}
]
读出来变成表的两行。Flink SQL层面最佳实践是什么?
如果没有办法是不是只能改JSON结构了。
可爱的木兰
Re: 回复: Flink SQL处理Array型的JSON
Posted by wxpcc <wx...@outlook.com>.
补充:
最终查询为
SELECT
t.*
FROM
kafka_source,
LATERAL TABLE( fromJson(data) ) as t
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复: Flink SQL处理Array型的JSON
Posted by wxpcc <wx...@outlook.com>.
如果不等待最新版本的话也可以这样
将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect
if (Objects.nonNull(str)) {
if (isArray) {
JsonNode node = objectMapper.readTree(str);
if (node.isArray()) {
Iterator<JsonNode> nodeIterator = node.elements();
while (nodeIterator.hasNext()) {
collect(deserializationSchema.deserialize(nodeIterator.next().toString().getBytes()));
}
}
} else {
collect(deserializationSchema.deserialize(str.getBytes()));
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/
回复: Flink SQL处理Array型的JSON
Posted by hua mulan <de...@outlook.com>.
Hi
那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 在导入Kafka,之后再FlinkSQL 处理。
可爱的木兰
________________________________
发件人: Benchao Li <li...@apache.org>
发送时间: 2020年7月14日 11:00
收件人: user-zh <us...@flink.apache.org>
主题: Re: Flink SQL处理Array型的JSON
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].
[1] https://issues.apache.org/jira/browse/FLINK-18590
Leonard Xu <xb...@gmail.com> 于2020年7月14日周二 上午10:42写道:
> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan <de...@outlook.com> 写道:
> >
> > 可爱的木兰
>
>
--
Best,
Benchao Li
Re: Flink SQL处理Array型的JSON
Posted by Benchao Li <li...@apache.org>.
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].
[1] https://issues.apache.org/jira/browse/FLINK-18590
Leonard Xu <xb...@gmail.com> 于2020年7月14日周二 上午10:42写道:
> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan <de...@outlook.com> 写道:
> >
> > 可爱的木兰
>
>
--
Best,
Benchao Li
回复: Flink SQL处理Array型的JSON
Posted by hua mulan <de...@outlook.com>.
Hello,Leonard Xu
我这边JSON 不是
{
"id": 2,
"heap": [
{
"foo": 14,
"bar": "foo"
},
{
"foo": 16,
"bar": "bar"
}
],
}
而是直接一个Array
[
{
"foo": 14,
"bar": "foo"
},
{
"foo": 16,
"bar": "bar"
}
]
我发现DDL没法声明,SQL层面我不知道怎么做了。
可爱的木兰
________________________________
发件人: Leonard Xu <xb...@gmail.com>
发送时间: 2020年7月14日 10:42
收件人: user-zh <us...@flink.apache.org>
主题: Re: Flink SQL处理Array型的JSON
Hello,可爱的木兰
可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html>
> 在 2020年7月14日,10:34,hua mulan <de...@outlook.com> 写道:
>
> 可爱的木兰
Re: Flink SQL处理Array型的JSON
Posted by Leonard Xu <xb...@gmail.com>.
Hello,可爱的木兰
可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html>
> 在 2020年7月14日,10:34,hua mulan <de...@outlook.com> 写道:
>
> 可爱的木兰