You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 杨光 <la...@gmail.com> on 2019/03/04 07:38:22 UTC
flink sql about nested json
Hi,
i am trying the flink sql api to read json formate data from kafka topic.
My json schema is a nested json like this
{
"type": "object",
"properties": {
"table": {
"type": "string"
},
"str2": {
"type": "string"
},
"obj1": {
"type": "object",
"properties": {
"rkey": {
"type": "string"
},
"val": {
"type": "string"
},
"lastTime": {
"type": "number"
}
},
"required": ["lastTime", "rkey", "val"]
},
"obj2": {
"type": "object",
"properties": {
"val": {
"type": "string"
},
"lastTime": {
"type": "number"
}
},
"required": ["lastTime", "val"]
}
},
"required": ["table", "str2", "obj1", "obj2"]
}
i define a table sechema like this.
Schema schemaDesc1 = new Schema()
.......
.field("tablestr", Types.STRING).from("table")
.......
.field("rkey", Types.STRING).from("rkey");
when i run a debug case ,i got error about the "rkey" field (the file in
the nest obj1)
" SQL validation failed. Table field 'rkey' was resolved to TableSource
return type field 'rkey', but field 'rkey' was not found in the return type
Row".
My question is :does the org.apache.flink.table.descriptors.Json format
support nested json schema? If does ,how can i set the right format or
schema ? If not ,then how can i apply flink sql api on nested json data
source.
Re: flink sql about nested json
Posted by 杨光 <la...@gmail.com>.
HI Timo
I have get the nested value by change the Schema definition like this
Schema schemaDesc1 = new Schema()
.field("str2", Types.STRING)
.field("tablestr", Types.STRING).from("table")
* .field("obj1", Types.ROW_NAMED(new
String[]{"rkey","val","lastTime"},
Types.STRING,Types.STRING,Types.BIG_DEC));*
and the sql like this
SELECT tablestr, *obj1.rkey* from mytable ...
So it looks like is not the json parser format problem , it's about how to
define nested Schema and select them in the sql.
Are there any documents i can learn more detail about this?
Thanks!
Timo Walther <tw...@apache.org> 于2019年3月5日周二 上午12:13写道:
> Hi,
>
> Flink SQL JSON format supports nested formats like the schema that you
> posted. Maybe the renaming with `from()` works not as expected. Did you
> try it without the `from()` where schema fields are equal to JSON fields?
>
> Alternatively, you could also define the schema only and use the
> `deriveSchema()` mode of the format.
>
> Btw there is a big bug in the JSON format that could affect how rows are
> parsed (https://issues.apache.org/jira/browse/FLINK-11727).
>
> Maybe it is worth it to write your own format and perform the JSON
> parsing logic how you would like it.
>
> Regards,
> Timo
>
> Am 04.03.19 um 08:38 schrieb 杨光:
> > Hi,
> > i am trying the flink sql api to read json formate data from kafka topic.
> > My json schema is a nested json like this
> > {
> > "type": "object",
> > "properties": {
> > "table": {
> > "type": "string"
> > },
> > "str2": {
> > "type": "string"
> > },
> > "obj1": {
> > "type": "object",
> > "properties": {
> > "rkey": {
> > "type": "string"
> > },
> > "val": {
> > "type": "string"
> > },
> > "lastTime": {
> > "type": "number"
> > }
> > },
> > "required": ["lastTime", "rkey", "val"]
> > },
> > "obj2": {
> > "type": "object",
> > "properties": {
> > "val": {
> > "type": "string"
> > },
> > "lastTime": {
> > "type": "number"
> > }
> > },
> > "required": ["lastTime", "val"]
> > }
> > },
> > "required": ["table", "str2", "obj1", "obj2"]
> > }
> >
> > i define a table sechema like this.
> >
> > Schema schemaDesc1 = new Schema()
> > .......
> > .field("tablestr", Types.STRING).from("table")
> > .......
> > .field("rkey", Types.STRING).from("rkey");
> >
> >
> > when i run a debug case ,i got error about the "rkey" field (the file
> > in the nest obj1)
> > " SQL validation failed. Table field 'rkey' was resolved to
> > TableSource return type field 'rkey', but field 'rkey' was not found
> > in the return type Row".
> >
> > My question is :does the org.apache.flink.table.descriptors.Json
> > format support nested json schema? If does ,how can i set the right
> > format or schema ? If not ,then how can i apply flink sql api on
> > nested json data source.
>
>
>
Re: flink sql about nested json
Posted by 杨光 <la...@gmail.com>.
杨光 <la...@gmail.com>
下午3:22 (1分钟前)
发送至 Timo、 user
HI Timo
I have get the nested value by change the Schema definition like this
Schema schemaDesc1 = new Schema()
.field("str2", Types.STRING)
.field("tablestr", Types.STRING).from("table")
* .field("obj1", Types.ROW_NAMED(new
String[]{"rkey","val","lastTime"},
Types.STRING,Types.STRING,Types.BIG_DEC));*
and the sql like this
SELECT tablestr, *obj1.rkey* from mytable ...
So it looks like is not the json parser format problem , it's about how to
define nested Schema and select them in the sql.
Are there any documents i can learn more detail about this?
Thanks!
Timo Walther <tw...@apache.org> 于2019年3月5日周二 上午12:13写道:
> Hi,
>
> Flink SQL JSON format supports nested formats like the schema that you
> posted. Maybe the renaming with `from()` works not as expected. Did you
> try it without the `from()` where schema fields are equal to JSON fields?
>
> Alternatively, you could also define the schema only and use the
> `deriveSchema()` mode of the format.
>
> Btw there is a big bug in the JSON format that could affect how rows are
> parsed (https://issues.apache.org/jira/browse/FLINK-11727).
>
> Maybe it is worth it to write your own format and perform the JSON
> parsing logic how you would like it.
>
> Regards,
> Timo
>
> Am 04.03.19 um 08:38 schrieb 杨光:
> > Hi,
> > i am trying the flink sql api to read json formate data from kafka topic.
> > My json schema is a nested json like this
> > {
> > "type": "object",
> > "properties": {
> > "table": {
> > "type": "string"
> > },
> > "str2": {
> > "type": "string"
> > },
> > "obj1": {
> > "type": "object",
> > "properties": {
> > "rkey": {
> > "type": "string"
> > },
> > "val": {
> > "type": "string"
> > },
> > "lastTime": {
> > "type": "number"
> > }
> > },
> > "required": ["lastTime", "rkey", "val"]
> > },
> > "obj2": {
> > "type": "object",
> > "properties": {
> > "val": {
> > "type": "string"
> > },
> > "lastTime": {
> > "type": "number"
> > }
> > },
> > "required": ["lastTime", "val"]
> > }
> > },
> > "required": ["table", "str2", "obj1", "obj2"]
> > }
> >
> > i define a table sechema like this.
> >
> > Schema schemaDesc1 = new Schema()
> > .......
> > .field("tablestr", Types.STRING).from("table")
> > .......
> > .field("rkey", Types.STRING).from("rkey");
> >
> >
> > when i run a debug case ,i got error about the "rkey" field (the file
> > in the nest obj1)
> > " SQL validation failed. Table field 'rkey' was resolved to
> > TableSource return type field 'rkey', but field 'rkey' was not found
> > in the return type Row".
> >
> > My question is :does the org.apache.flink.table.descriptors.Json
> > format support nested json schema? If does ,how can i set the right
> > format or schema ? If not ,then how can i apply flink sql api on
> > nested json data source.
>
>
>
Re: flink sql about nested json
Posted by Timo Walther <tw...@apache.org>.
Hi,
Flink SQL JSON format supports nested formats like the schema that you
posted. Maybe the renaming with `from()` works not as expected. Did you
try it without the `from()` where schema fields are equal to JSON fields?
Alternatively, you could also define the schema only and use the
`deriveSchema()` mode of the format.
Btw there is a big bug in the JSON format that could affect how rows are
parsed (https://issues.apache.org/jira/browse/FLINK-11727).
Maybe it is worth it to write your own format and perform the JSON
parsing logic how you would like it.
Regards,
Timo
Am 04.03.19 um 08:38 schrieb 杨光:
> Hi,
> i am trying the flink sql api to read json formate data from kafka topic.
> My json schema is a nested json like this
> {
> "type": "object",
> "properties": {
> "table": {
> "type": "string"
> },
> "str2": {
> "type": "string"
> },
> "obj1": {
> "type": "object",
> "properties": {
> "rkey": {
> "type": "string"
> },
> "val": {
> "type": "string"
> },
> "lastTime": {
> "type": "number"
> }
> },
> "required": ["lastTime", "rkey", "val"]
> },
> "obj2": {
> "type": "object",
> "properties": {
> "val": {
> "type": "string"
> },
> "lastTime": {
> "type": "number"
> }
> },
> "required": ["lastTime", "val"]
> }
> },
> "required": ["table", "str2", "obj1", "obj2"]
> }
>
> i define a table sechema like this.
>
> Schema schemaDesc1 = new Schema()
> .......
> .field("tablestr", Types.STRING).from("table")
> .......
> .field("rkey", Types.STRING).from("rkey");
>
>
> when i run a debug case ,i got error about the "rkey" field (the file
> in the nest obj1)
> " SQL validation failed. Table field 'rkey' was resolved to
> TableSource return type field 'rkey', but field 'rkey' was not found
> in the return type Row".
>
> My question is :does the org.apache.flink.table.descriptors.Json
> format support nested json schema? If does ,how can i set the right
> format or schema ? If not ,then how can i apply flink sql api on
> nested json data source.