You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Zain Haider Nemati <za...@retailo.co> on 2022/05/21 06:58:33 UTC
Json Deserialize in DataStream API with array length not fixed
Hi Folks,
I have data coming in this format:
{
“data”: {
“oid__id”: “61de4f26f01131783f162453”,
“array_coordinates”: “[ { \“speed\” : \“xxx\“, \“accuracy\” :
\“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” :
\“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : {
\“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” :
\“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” :
\“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\”
: \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
“batchId”: “xxx",
“agentId”: “xxx",
“routeKey”: “40042-12-01-2022",
“__v”: 0
},
“metadata”: {
“timestamp”: “2022-05-02T18:49:52.619827Z”,
“record-type”: “data”,
“operation”: “load”,
“partition-key-type”: “primary-key”,
“schema-name”: “xxx”,
“table-name”: “xxx”
}
}
Where length of array coordinates array varies is not fixed in the source
is their any way to define a json deserializer for this? If so would really
appreciate if I can get some help on this
Re: Json Deserialize in DataStream API with array length not fixed
Posted by Shengkai Fang <fs...@gmail.com>.
Hi.
In the SQL, you can just specify the `array_coordinates` type ARRAY[1]. For
example,
```
CREATE TABLE source(
`array_coordinates` ARRAY<ROW<`speed` STRING, ...>>
) WITH (
'format' = 'json'
)
```
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
Zain Haider Nemati <za...@retailo.co> 于2022年5月21日周六 14:59写道:
> Hi Folks,
> I have data coming in this format:
>
> {
> “data”: {
> “oid__id”: “61de4f26f01131783f162453”,
> “array_coordinates”: “[ { \“speed\” : \“xxx\“, \“accuracy\” :
> \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” :
> \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : {
> \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” :
> \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” :
> \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\”
> : \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
> “batchId”: “xxx",
> “agentId”: “xxx",
> “routeKey”: “40042-12-01-2022",
> “__v”: 0
> },
> “metadata”: {
> “timestamp”: “2022-05-02T18:49:52.619827Z”,
> “record-type”: “data”,
> “operation”: “load”,
> “partition-key-type”: “primary-key”,
> “schema-name”: “xxx”,
> “table-name”: “xxx”
> }
> }
>
> Where length of array coordinates array varies is not fixed in the source
> is their any way to define a json deserializer for this? If so would really
> appreciate if I can get some help on this
>
Re: Json Deserialize in DataStream API with array length not fixed
Posted by Qingsheng Ren <re...@gmail.com>.
Hi Zain,
I assume you are using DataStream API as described in the subject of your email, so I think you can define any functions/transformations to parse the json value, even the schema is changing.
It looks like the value of field “array_coordinates” is a an escaped json-formatted STRING instead of an json object, so I prefer to parse the input json string first using Jackson (or any json parser you like), extract the field “array_coordinates” as a string, remove all backslashs to un-escape the string, and use Jackson again to parse it.
If you are using Table / SQL API, I’m afaid you have to use UDTF to parse the input because the schema varies in the field “array_coordinates”.
Hope this could be helpful!
Cheers,
Qingsheng
> On May 21, 2022, at 14:58, Zain Haider Nemati <za...@retailo.co> wrote:
>
> Hi Folks,
> I have data coming in this format:
>
> {
> “data”: {
> “oid__id”: “61de4f26f01131783f162453”,
> “array_coordinates”: “[ { \“speed\” : \“xxx\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” : \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
> “batchId”: “xxx",
> “agentId”: “xxx",
> “routeKey”: “40042-12-01-2022",
> “__v”: 0
> },
> “metadata”: {
> “timestamp”: “2022-05-02T18:49:52.619827Z”,
> “record-type”: “data”,
> “operation”: “load”,
> “partition-key-type”: “primary-key”,
> “schema-name”: “xxx”,
> “table-name”: “xxx”
> }
> }
>
> Where length of array coordinates array varies is not fixed in the source is their any way to define a json deserializer for this? If so would really appreciate if I can get some help on this
Re: Json Deserialize in DataStream API with array length not fixed
Posted by Shengkai Fang <fs...@gmail.com>.
Hi.
In the SQL, you can just specify the `array_coordinates` type ARRAY[1]. For
example,
```
CREATE TABLE source(
`array_coordinates` ARRAY<ROW<`speed` STRING, ...>>
) WITH (
'format' = 'json'
)
```
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
Zain Haider Nemati <za...@retailo.co> 于2022年5月21日周六 14:59写道:
> Hi Folks,
> I have data coming in this format:
>
> {
> “data”: {
> “oid__id”: “61de4f26f01131783f162453”,
> “array_coordinates”: “[ { \“speed\” : \“xxx\“, \“accuracy\” :
> \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” :
> \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : {
> \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” :
> \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” :
> \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\”
> : \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
> “batchId”: “xxx",
> “agentId”: “xxx",
> “routeKey”: “40042-12-01-2022",
> “__v”: 0
> },
> “metadata”: {
> “timestamp”: “2022-05-02T18:49:52.619827Z”,
> “record-type”: “data”,
> “operation”: “load”,
> “partition-key-type”: “primary-key”,
> “schema-name”: “xxx”,
> “table-name”: “xxx”
> }
> }
>
> Where length of array coordinates array varies is not fixed in the source
> is their any way to define a json deserializer for this? If so would really
> appreciate if I can get some help on this
>