You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 杨光 <la...@gmail.com> on 2019/03/04 07:38:22 UTC

flink sql about nested json

Hi,
i am trying the flink sql api to read json formate data from kafka topic.
My json schema is a nested json like this
{
  "type": "object",
  "properties": {
    "table": {
      "type": "string"
    },
    "str2": {
      "type": "string"
    },
    "obj1": {
      "type": "object",
      "properties": {
        "rkey": {
          "type": "string"
        },
        "val": {
          "type": "string"
        },
        "lastTime": {
          "type": "number"
        }
      },
      "required": ["lastTime", "rkey", "val"]
    },
    "obj2": {
      "type": "object",
      "properties": {
        "val": {
          "type": "string"
        },
        "lastTime": {
          "type": "number"
        }
      },
      "required": ["lastTime", "val"]
    }
  },
  "required": ["table", "str2", "obj1", "obj2"]
}

i define a table sechema like this.

Schema schemaDesc1 = new Schema()
        .......
        .field("tablestr", Types.STRING).from("table")
        .......
        .field("rkey", Types.STRING).from("rkey");


when i run a debug case ,i got error about the "rkey" field (the file in
the nest obj1)
" SQL validation failed. Table field 'rkey' was resolved to TableSource
return type field 'rkey', but field 'rkey' was not found in the return type
Row".

My question is :does the org.apache.flink.table.descriptors.Json format
support nested json schema? If does ,how can i set the right format or
schema ? If not ,then how can i apply flink sql api on nested json data
source.

Re: flink sql about nested json

Posted by 杨光 <la...@gmail.com>.
HI Timo
I have get the nested value by change the Schema definition like this
  Schema schemaDesc1 = new Schema()
            .field("str2", Types.STRING)
        .field("tablestr", Types.STRING).from("table")
       * .field("obj1", Types.ROW_NAMED(new
String[]{"rkey","val","lastTime"},
Types.STRING,Types.STRING,Types.BIG_DEC));*

  and the sql like this

  SELECT tablestr, *obj1.rkey*  from mytable ...

So it looks like is not the json parser format problem , it's about how to
define nested Schema and select them  in the sql.
Are there any documents i can learn more detail about this?
Thanks!

Timo Walther <tw...@apache.org> 于2019年3月5日周二 上午12:13写道:

> Hi,
>
> Flink SQL JSON format supports nested formats like the schema that you
> posted. Maybe the renaming with `from()` works not as expected. Did you
> try it without the `from()` where schema fields are equal to JSON fields?
>
> Alternatively, you could also define the schema only and use the
> `deriveSchema()` mode of the format.
>
> Btw there is a big bug in the JSON format that could affect how rows are
> parsed (https://issues.apache.org/jira/browse/FLINK-11727).
>
> Maybe it is worth it to write your own format and perform the JSON
> parsing logic how you would like it.
>
> Regards,
> Timo
>
> Am 04.03.19 um 08:38 schrieb 杨光:
> > Hi,
> > i am trying the flink sql api to read json formate data from kafka topic.
> > My json schema is a nested json like this
> > {
> >   "type": "object",
> >   "properties": {
> >     "table": {
> >       "type": "string"
> >     },
> >     "str2": {
> >       "type": "string"
> >     },
> >     "obj1": {
> >       "type": "object",
> >       "properties": {
> >         "rkey": {
> >           "type": "string"
> >         },
> >         "val": {
> >           "type": "string"
> >         },
> >         "lastTime": {
> >           "type": "number"
> >         }
> >       },
> >       "required": ["lastTime", "rkey", "val"]
> >     },
> >     "obj2": {
> >       "type": "object",
> >       "properties": {
> >         "val": {
> >           "type": "string"
> >         },
> >         "lastTime": {
> >           "type": "number"
> >         }
> >       },
> >       "required": ["lastTime", "val"]
> >     }
> >   },
> >   "required": ["table", "str2", "obj1", "obj2"]
> > }
> >
> > i define a table sechema like this.
> >
> > Schema schemaDesc1 = new Schema()
> >         .......
> >         .field("tablestr", Types.STRING).from("table")
> >         .......
> >         .field("rkey", Types.STRING).from("rkey");
> >
> >
> > when i run a debug case ,i got error about the "rkey" field (the file
> > in the nest obj1)
> > " SQL validation failed. Table field 'rkey' was resolved to
> > TableSource return type field 'rkey', but field 'rkey' was not found
> > in the return type Row".
> >
> > My question is :does the org.apache.flink.table.descriptors.Json
> > format support nested json schema? If does ,how can i set the right
> > format or schema ? If not ,then how can i apply flink sql api on
> > nested json data source.
>
>
>

Re: flink sql about nested json

Posted by 杨光 <la...@gmail.com>.
杨光 <la...@gmail.com>
下午3:22 (1分钟前)
发送至 Timo、 user
HI Timo
I have get the nested value by change the Schema definition like this
  Schema schemaDesc1 = new Schema()
            .field("str2", Types.STRING)
        .field("tablestr", Types.STRING).from("table")
       * .field("obj1", Types.ROW_NAMED(new
String[]{"rkey","val","lastTime"},
Types.STRING,Types.STRING,Types.BIG_DEC));*

  and the sql like this

  SELECT tablestr, *obj1.rkey*  from mytable ...

So it looks like is not the json parser format problem , it's about how to
define nested Schema and select them  in the sql.
Are there any documents i can learn more detail about this?
Thanks!

Timo Walther <tw...@apache.org> 于2019年3月5日周二 上午12:13写道:

> Hi,
>
> Flink SQL JSON format supports nested formats like the schema that you
> posted. Maybe the renaming with `from()` works not as expected. Did you
> try it without the `from()` where schema fields are equal to JSON fields?
>
> Alternatively, you could also define the schema only and use the
> `deriveSchema()` mode of the format.
>
> Btw there is a big bug in the JSON format that could affect how rows are
> parsed (https://issues.apache.org/jira/browse/FLINK-11727).
>
> Maybe it is worth it to write your own format and perform the JSON
> parsing logic how you would like it.
>
> Regards,
> Timo
>
> Am 04.03.19 um 08:38 schrieb 杨光:
> > Hi,
> > i am trying the flink sql api to read json formate data from kafka topic.
> > My json schema is a nested json like this
> > {
> >   "type": "object",
> >   "properties": {
> >     "table": {
> >       "type": "string"
> >     },
> >     "str2": {
> >       "type": "string"
> >     },
> >     "obj1": {
> >       "type": "object",
> >       "properties": {
> >         "rkey": {
> >           "type": "string"
> >         },
> >         "val": {
> >           "type": "string"
> >         },
> >         "lastTime": {
> >           "type": "number"
> >         }
> >       },
> >       "required": ["lastTime", "rkey", "val"]
> >     },
> >     "obj2": {
> >       "type": "object",
> >       "properties": {
> >         "val": {
> >           "type": "string"
> >         },
> >         "lastTime": {
> >           "type": "number"
> >         }
> >       },
> >       "required": ["lastTime", "val"]
> >     }
> >   },
> >   "required": ["table", "str2", "obj1", "obj2"]
> > }
> >
> > i define a table sechema like this.
> >
> > Schema schemaDesc1 = new Schema()
> >         .......
> >         .field("tablestr", Types.STRING).from("table")
> >         .......
> >         .field("rkey", Types.STRING).from("rkey");
> >
> >
> > when i run a debug case ,i got error about the "rkey" field (the file
> > in the nest obj1)
> > " SQL validation failed. Table field 'rkey' was resolved to
> > TableSource return type field 'rkey', but field 'rkey' was not found
> > in the return type Row".
> >
> > My question is :does the org.apache.flink.table.descriptors.Json
> > format support nested json schema? If does ,how can i set the right
> > format or schema ? If not ,then how can i apply flink sql api on
> > nested json data source.
>
>
>

Re: flink sql about nested json

Posted by Timo Walther <tw...@apache.org>.
Hi,

Flink SQL JSON format supports nested formats like the schema that you 
posted. Maybe the renaming with `from()` works not as expected. Did you 
try it without the `from()` where schema fields are equal to JSON fields?

Alternatively, you could also define the schema only and use the 
`deriveSchema()` mode of the format.

Btw there is a big bug in the JSON format that could affect how rows are 
parsed (https://issues.apache.org/jira/browse/FLINK-11727).

Maybe it is worth it to write your own format and perform the JSON 
parsing logic how you would like it.

Regards,
Timo

Am 04.03.19 um 08:38 schrieb 杨光:
> Hi,
> i am trying the flink sql api to read json formate data from kafka topic.
> My json schema is a nested json like this
> {
>   "type": "object",
>   "properties": {
>     "table": {
>       "type": "string"
>     },
>     "str2": {
>       "type": "string"
>     },
>     "obj1": {
>       "type": "object",
>       "properties": {
>         "rkey": {
>           "type": "string"
>         },
>         "val": {
>           "type": "string"
>         },
>         "lastTime": {
>           "type": "number"
>         }
>       },
>       "required": ["lastTime", "rkey", "val"]
>     },
>     "obj2": {
>       "type": "object",
>       "properties": {
>         "val": {
>           "type": "string"
>         },
>         "lastTime": {
>           "type": "number"
>         }
>       },
>       "required": ["lastTime", "val"]
>     }
>   },
>   "required": ["table", "str2", "obj1", "obj2"]
> }
>
> i define a table sechema like this.
>
> Schema schemaDesc1 = new Schema()
>         .......
>         .field("tablestr", Types.STRING).from("table")
>         .......
>         .field("rkey", Types.STRING).from("rkey");
>
>
> when i run a debug case ,i got error about the "rkey" field (the file 
> in the nest obj1)
> " SQL validation failed. Table field 'rkey' was resolved to 
> TableSource return type field 'rkey', but field 'rkey' was not found 
> in the return type Row".
>
> My question is :does the org.apache.flink.table.descriptors.Json 
> format support nested json schema? If does ,how can i set the right 
> format or schema ? If not ,then how can i apply flink sql api on 
> nested json data source.