You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Zain Haider Nemati <za...@retailo.co> on 2022/05/21 06:58:33 UTC

Json Deserialize in DataStream API with array length not fixed

Hi Folks,
I have data coming in this format:

{
    “data”: {
        “oid__id”:  “61de4f26f01131783f162453”,
        “array_coordinates”:    “[ { \“speed\” : \“xxx\“, \“accuracy\” :
\“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” :
\“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : {
\“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” :
\“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” :
\“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\”
: \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
        “batchId”:  “xxx",
        “agentId”:  “xxx",
        “routeKey”: “40042-12-01-2022",
        “__v”:  0
    },
    “metadata”: {
        “timestamp”:    “2022-05-02T18:49:52.619827Z”,
        “record-type”:  “data”,
        “operation”:    “load”,
        “partition-key-type”:   “primary-key”,
        “schema-name”:  “xxx”,
        “table-name”:   “xxx”
    }
}

Where length of array coordinates array varies is not fixed in the source
is their any way to define a json deserializer for this? If so would really
appreciate if I can get some help on this

Re: Json Deserialize in DataStream API with array length not fixed

Posted by Shengkai Fang <fs...@gmail.com>.
Hi.

In the SQL, you can just specify the `array_coordinates` type ARRAY[1]. For
example,

```
CREATE TABLE source(
     `array_coordinates` ARRAY<ROW<`speed` STRING, ...>>
) WITH (
   'format' = 'json'
)
```

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/


Zain Haider Nemati <za...@retailo.co> 于2022年5月21日周六 14:59写道:

> Hi Folks,
> I have data coming in this format:
>
> {
>     “data”: {
>         “oid__id”:  “61de4f26f01131783f162453”,
>         “array_coordinates”:    “[ { \“speed\” : \“xxx\“, \“accuracy\” :
> \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” :
> \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : {
> \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” :
> \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” :
> \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\”
> : \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
>         “batchId”:  “xxx",
>         “agentId”:  “xxx",
>         “routeKey”: “40042-12-01-2022",
>         “__v”:  0
>     },
>     “metadata”: {
>         “timestamp”:    “2022-05-02T18:49:52.619827Z”,
>         “record-type”:  “data”,
>         “operation”:    “load”,
>         “partition-key-type”:   “primary-key”,
>         “schema-name”:  “xxx”,
>         “table-name”:   “xxx”
>     }
> }
>
> Where length of array coordinates array varies is not fixed in the source
> is their any way to define a json deserializer for this? If so would really
> appreciate if I can get some help on this
>

Re: Json Deserialize in DataStream API with array length not fixed

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Zain,

I assume you are using DataStream API as described in the subject of your email, so I think you can define any functions/transformations to parse the json value, even the schema is changing. 

It looks like the value of field “array_coordinates” is a an escaped json-formatted STRING instead of an json object, so I prefer to parse the input json string first using Jackson (or any json parser you like), extract the field “array_coordinates” as a string, remove all backslashs to un-escape the string, and use Jackson again to parse it. 

If you are using Table / SQL API, I’m afaid you have to use UDTF to parse the input because the schema varies in the field “array_coordinates”. 

Hope this could be helpful!

Cheers, 

Qingsheng

> On May 21, 2022, at 14:58, Zain Haider Nemati <za...@retailo.co> wrote:
> 
> Hi Folks,
> I have data coming in this format:
> 
> {
>     “data”: {
>         “oid__id”:  “61de4f26f01131783f162453”,
>         “array_coordinates”:    “[ { \“speed\” : \“xxx\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” : \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
>         “batchId”:  “xxx",
>         “agentId”:  “xxx",
>         “routeKey”: “40042-12-01-2022",
>         “__v”:  0
>     },
>     “metadata”: {
>         “timestamp”:    “2022-05-02T18:49:52.619827Z”,
>         “record-type”:  “data”,
>         “operation”:    “load”,
>         “partition-key-type”:   “primary-key”,
>         “schema-name”:  “xxx”,
>         “table-name”:   “xxx”
>     }
> }
> 
> Where length of array coordinates array varies is not fixed in the source is their any way to define a json deserializer for this? If so would really appreciate if I can get some help on this


Re: Json Deserialize in DataStream API with array length not fixed

Posted by Shengkai Fang <fs...@gmail.com>.
Hi.

In the SQL, you can just specify the `array_coordinates` type ARRAY[1]. For
example,

```
CREATE TABLE source(
     `array_coordinates` ARRAY<ROW<`speed` STRING, ...>>
) WITH (
   'format' = 'json'
)
```

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/


Zain Haider Nemati <za...@retailo.co> 于2022年5月21日周六 14:59写道:

> Hi Folks,
> I have data coming in this format:
>
> {
>     “data”: {
>         “oid__id”:  “61de4f26f01131783f162453”,
>         “array_coordinates”:    “[ { \“speed\” : \“xxx\“, \“accuracy\” :
> \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” :
> \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : {
> \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” :
> \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” :
> \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\”
> : \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
>         “batchId”:  “xxx",
>         “agentId”:  “xxx",
>         “routeKey”: “40042-12-01-2022",
>         “__v”:  0
>     },
>     “metadata”: {
>         “timestamp”:    “2022-05-02T18:49:52.619827Z”,
>         “record-type”:  “data”,
>         “operation”:    “load”,
>         “partition-key-type”:   “primary-key”,
>         “schema-name”:  “xxx”,
>         “table-name”:   “xxx”
>     }
> }
>
> Where length of array coordinates array varies is not fixed in the source
> is their any way to define a json deserializer for this? If so would really
> appreciate if I can get some help on this
>