You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Guodong Wang <wa...@gmail.com> on 2020/05/28 10:31:55 UTC

How to create schema for flexible json data in Flink SQL

Hi !

I want to use Flink SQL to process some json events. It is quite
challenging to define a schema for the Flink SQL table.

My data source's format is some json like this
{
"top_level_key1": "some value",
"nested_object": {
"nested_key1": "abc",
"nested_key2": 123,
"nested_key3": ["element1", "element2", "element3"]
}
}

The big challenges for me to define a schema for the data source are
1. the keys in nested_object are flexible, there might be 3 unique keys or
more unique keys. If I enumerate all the keys in the schema, I think my
code is fragile, how to handle event which contains more  nested_keys in
nested_object ?
2. I know table api support Map type, but I am not sure if I can put
generic object as the value of the map. Because the values in nested_object
are of different types, some of them are int, some of them are string or
array.

So. how to expose this kind of json data as table in Flink SQL without
enumerating all the nested_keys?

Thanks.

Guodong

Re: How to create schema for flexible json data in Flink SQL

Posted by Guodong Wang <wa...@gmail.com>.
Hi Jark,

You totally got my point. Actually, the perfect solution in my opinion is
to support schema evolution in one query.
Although classic SQL needs to know the schema before do any computing, when
integrating the nosql data source to flink datastream, if schema evolution
is possible, it will save tons of time for user.
For example, when I have some json docs in mongodb, I want to expose the
collections as tables in flink SQL. But aligning the schema in flink
catalog service is not very friendly, I need to remember to update the
catalog when I add a new field in my database.

Although, it is not easy to validate SQL correctly if there is no schema
information about the table, for example "select sum(amount) from my_table
group by category", if the amount field is not number, runtime error will
be thrown.
I think this is another challenge about supporting schema evolution.
anyway, I think deferring the errors to runtime is fair when user wants to
have schema flexibility.


Guodong


On Mon, Jun 1, 2020 at 12:29 PM Jark Wu <im...@gmail.com> wrote:

> Hi all,
>
> This is an interesting topic. Schema inference will be the next big
> feature planned in the next release.
> I added this thread link into FLINK-16420.
>
> I think the case of Guodong is schema evolution, which I think there is
> something to do with schema inference.
> I don't have a clear idea for this yet, but some initial thoughts are:
>
> 1) schema inference can happen for each query, instead of when creating
> table.
>     So that, once data schema is evolved, the catalog table can have the
> new schema.
>     However, this may break existing queries on this catalog table (e.g.
> SELECT * FROM T).
> 2) manually create a new table with schema inference, we can use LIKE
> grammer or SHOW CREATE TABLE to
>     help creating a table based on existing ones. The new table have the
> new schema because we re-infer schema again.
> 3) auto-matically create a new tabel with schema inference. This can be
> done with some catalogs, for example, SchemaRegistryCatalog,
>     once a new avro schema (say schema id = 100) is added to the registry,
> users can use this new schema with table "mytopic-100".
>
>
> Best,
> Jark
>
>
> On Fri, 29 May 2020 at 22:05, Guodong Wang <wa...@gmail.com> wrote:
>
>> Benchao,
>>
>> Thank you for your detailed explanation.
>>
>> Schema Inference can solve my problem partially. For example, starting
>> from some time, all the json afterward will contain a new field. I think
>> for this case, schema inference will help.
>> but if I need to handle all the json events with different schemas in one
>> table(this is the case 2),  I agree with you. Schema inference does not
>> help either.
>>
>>
>>
>> Guodong
>>
>>
>> On Fri, May 29, 2020 at 11:02 AM Benchao Li <li...@gmail.com> wrote:
>>
>>> Hi Guodong,
>>>
>>> After an offline discussion with Leonard. I think you get the right
>>> meaning of schema inference.
>>> But there are two problems here:
>>> 1. schema of the data is fixed, schema inference can save your effort to
>>> write the schema explicitly.
>>> 2. schema of the data is dynamic, in this case the schema inference
>>> cannot help. Because SQL is somewhat static language, which should know all
>>> the data types at compile stage.
>>>
>>> Maybe I've misunderstood your question at the very beginning. I thought
>>> your case is #2. If your case is #1, then schema inference is a good
>>> choice.
>>>
>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午11:39写道:
>>>
>>>> Yes. Setting the value type as raw is one possible approach. And I
>>>> would like to vote for schema inference as well.
>>>>
>>>> Correct me if I am wrong, IMO schema inference means I can provide a
>>>> method in the table source to infer the data schema base on the runtime
>>>> computation. Just like some calcite adaptor does. Right?
>>>> For SQL table registration, I think that requiring the table source to
>>>> provide a static schema might be too strict. Let planner to infer the table
>>>> schema will be more flexible.
>>>>
>>>> Thank you for your suggestions.
>>>>
>>>> Guodong
>>>>
>>>>
>>>> On Thu, May 28, 2020 at 11:11 PM Benchao Li <li...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Guodong,
>>>>>
>>>>> Does the RAW type meet your requirements? For example, you can specify
>>>>> map<varchar, raw> type, and the value for the map is the raw JsonNode
>>>>> parsed from Jackson.
>>>>> This is not supported yet, however IMO this could be supported.
>>>>>
>>>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午9:43写道:
>>>>>
>>>>>> Benchao,
>>>>>>
>>>>>> Thank you for your quick reply.
>>>>>>
>>>>>> As you mentioned, for current scenario, approach 2 should work for
>>>>>> me. But it is a little bit annoying that I have to modify schema to add new
>>>>>> field types when upstream app changes the json format or adds new fields.
>>>>>> Otherwise, my user can not refer the field in their SQL.
>>>>>>
>>>>>> Per description in the jira, I think after implementing this, all the
>>>>>> json values will be converted as strings.
>>>>>> I am wondering if Flink SQL can/will support the flexible schema in
>>>>>> the future, for example, register the table without defining
>>>>>> specific schema for each field, to let user define a generic map or array
>>>>>> for one field. but the value of map/array can be any object. Then, the type
>>>>>> conversion cost might be saved.
>>>>>>
>>>>>> Guodong
>>>>>>
>>>>>>
>>>>>> On Thu, May 28, 2020 at 7:43 PM Benchao Li <li...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Guodong,
>>>>>>>
>>>>>>> I think you almost get the answer,
>>>>>>> 1. map type, it's not working for current implementation. For
>>>>>>> example, use map<varchar, varchar>, if the value if non-string json object,
>>>>>>> then `JsonNode.asText()` may not work as you wish.
>>>>>>> 2. list all fields you cares. IMO, this can fit your scenario. And
>>>>>>> you can set format.fail-on-missing-field = true, to allow setting
>>>>>>> non-existed fields to be null.
>>>>>>>
>>>>>>> For 1, I think maybe we can support it in the future, and I've
>>>>>>> created jira[1] to track this.
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-18002
>>>>>>>
>>>>>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午6:32写道:
>>>>>>>
>>>>>>>> Hi !
>>>>>>>>
>>>>>>>> I want to use Flink SQL to process some json events. It is quite
>>>>>>>> challenging to define a schema for the Flink SQL table.
>>>>>>>>
>>>>>>>> My data source's format is some json like this
>>>>>>>> {
>>>>>>>> "top_level_key1": "some value",
>>>>>>>> "nested_object": {
>>>>>>>> "nested_key1": "abc",
>>>>>>>> "nested_key2": 123,
>>>>>>>> "nested_key3": ["element1", "element2", "element3"]
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> The big challenges for me to define a schema for the data source are
>>>>>>>> 1. the keys in nested_object are flexible, there might be 3 unique
>>>>>>>> keys or more unique keys. If I enumerate all the keys in the schema, I
>>>>>>>> think my code is fragile, how to handle event which contains more
>>>>>>>> nested_keys in nested_object ?
>>>>>>>> 2. I know table api support Map type, but I am not sure if I can
>>>>>>>> put generic object as the value of the map. Because the values in
>>>>>>>> nested_object are of different types, some of them are int, some of them
>>>>>>>> are string or array.
>>>>>>>>
>>>>>>>> So. how to expose this kind of json data as table in Flink SQL
>>>>>>>> without enumerating all the nested_keys?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> Guodong
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Best,
>>>>>>> Benchao Li
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best,
>>>>> Benchao Li
>>>>>
>>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>

Re: How to create schema for flexible json data in Flink SQL

Posted by Jark Wu <im...@gmail.com>.
Hi all,

This is an interesting topic. Schema inference will be the next big feature
planned in the next release.
I added this thread link into FLINK-16420.

I think the case of Guodong is schema evolution, which I think there is
something to do with schema inference.
I don't have a clear idea for this yet, but some initial thoughts are:

1) schema inference can happen for each query, instead of when creating
table.
    So that, once data schema is evolved, the catalog table can have the
new schema.
    However, this may break existing queries on this catalog table (e.g.
SELECT * FROM T).
2) manually create a new table with schema inference, we can use LIKE
grammer or SHOW CREATE TABLE to
    help creating a table based on existing ones. The new table have the
new schema because we re-infer schema again.
3) auto-matically create a new tabel with schema inference. This can be
done with some catalogs, for example, SchemaRegistryCatalog,
    once a new avro schema (say schema id = 100) is added to the registry,
users can use this new schema with table "mytopic-100".


Best,
Jark


On Fri, 29 May 2020 at 22:05, Guodong Wang <wa...@gmail.com> wrote:

> Benchao,
>
> Thank you for your detailed explanation.
>
> Schema Inference can solve my problem partially. For example, starting
> from some time, all the json afterward will contain a new field. I think
> for this case, schema inference will help.
> but if I need to handle all the json events with different schemas in one
> table(this is the case 2),  I agree with you. Schema inference does not
> help either.
>
>
>
> Guodong
>
>
> On Fri, May 29, 2020 at 11:02 AM Benchao Li <li...@gmail.com> wrote:
>
>> Hi Guodong,
>>
>> After an offline discussion with Leonard. I think you get the right
>> meaning of schema inference.
>> But there are two problems here:
>> 1. schema of the data is fixed, schema inference can save your effort to
>> write the schema explicitly.
>> 2. schema of the data is dynamic, in this case the schema inference
>> cannot help. Because SQL is somewhat static language, which should know all
>> the data types at compile stage.
>>
>> Maybe I've misunderstood your question at the very beginning. I thought
>> your case is #2. If your case is #1, then schema inference is a good
>> choice.
>>
>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午11:39写道:
>>
>>> Yes. Setting the value type as raw is one possible approach. And I would
>>> like to vote for schema inference as well.
>>>
>>> Correct me if I am wrong, IMO schema inference means I can provide a
>>> method in the table source to infer the data schema base on the runtime
>>> computation. Just like some calcite adaptor does. Right?
>>> For SQL table registration, I think that requiring the table source to
>>> provide a static schema might be too strict. Let planner to infer the table
>>> schema will be more flexible.
>>>
>>> Thank you for your suggestions.
>>>
>>> Guodong
>>>
>>>
>>> On Thu, May 28, 2020 at 11:11 PM Benchao Li <li...@gmail.com> wrote:
>>>
>>>> Hi Guodong,
>>>>
>>>> Does the RAW type meet your requirements? For example, you can specify
>>>> map<varchar, raw> type, and the value for the map is the raw JsonNode
>>>> parsed from Jackson.
>>>> This is not supported yet, however IMO this could be supported.
>>>>
>>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午9:43写道:
>>>>
>>>>> Benchao,
>>>>>
>>>>> Thank you for your quick reply.
>>>>>
>>>>> As you mentioned, for current scenario, approach 2 should work for me.
>>>>> But it is a little bit annoying that I have to modify schema to add new
>>>>> field types when upstream app changes the json format or adds new fields.
>>>>> Otherwise, my user can not refer the field in their SQL.
>>>>>
>>>>> Per description in the jira, I think after implementing this, all the
>>>>> json values will be converted as strings.
>>>>> I am wondering if Flink SQL can/will support the flexible schema in
>>>>> the future, for example, register the table without defining
>>>>> specific schema for each field, to let user define a generic map or array
>>>>> for one field. but the value of map/array can be any object. Then, the type
>>>>> conversion cost might be saved.
>>>>>
>>>>> Guodong
>>>>>
>>>>>
>>>>> On Thu, May 28, 2020 at 7:43 PM Benchao Li <li...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Guodong,
>>>>>>
>>>>>> I think you almost get the answer,
>>>>>> 1. map type, it's not working for current implementation. For
>>>>>> example, use map<varchar, varchar>, if the value if non-string json object,
>>>>>> then `JsonNode.asText()` may not work as you wish.
>>>>>> 2. list all fields you cares. IMO, this can fit your scenario. And
>>>>>> you can set format.fail-on-missing-field = true, to allow setting
>>>>>> non-existed fields to be null.
>>>>>>
>>>>>> For 1, I think maybe we can support it in the future, and I've
>>>>>> created jira[1] to track this.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-18002
>>>>>>
>>>>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午6:32写道:
>>>>>>
>>>>>>> Hi !
>>>>>>>
>>>>>>> I want to use Flink SQL to process some json events. It is quite
>>>>>>> challenging to define a schema for the Flink SQL table.
>>>>>>>
>>>>>>> My data source's format is some json like this
>>>>>>> {
>>>>>>> "top_level_key1": "some value",
>>>>>>> "nested_object": {
>>>>>>> "nested_key1": "abc",
>>>>>>> "nested_key2": 123,
>>>>>>> "nested_key3": ["element1", "element2", "element3"]
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> The big challenges for me to define a schema for the data source are
>>>>>>> 1. the keys in nested_object are flexible, there might be 3 unique
>>>>>>> keys or more unique keys. If I enumerate all the keys in the schema, I
>>>>>>> think my code is fragile, how to handle event which contains more
>>>>>>> nested_keys in nested_object ?
>>>>>>> 2. I know table api support Map type, but I am not sure if I can put
>>>>>>> generic object as the value of the map. Because the values in nested_object
>>>>>>> are of different types, some of them are int, some of them are string or
>>>>>>> array.
>>>>>>>
>>>>>>> So. how to expose this kind of json data as table in Flink SQL
>>>>>>> without enumerating all the nested_keys?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> Guodong
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Best,
>>>>>> Benchao Li
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Best,
>>>> Benchao Li
>>>>
>>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

Re: How to create schema for flexible json data in Flink SQL

Posted by Guodong Wang <wa...@gmail.com>.
Benchao,

Thank you for your detailed explanation.

Schema Inference can solve my problem partially. For example, starting from
some time, all the json afterward will contain a new field. I think for
this case, schema inference will help.
but if I need to handle all the json events with different schemas in one
table(this is the case 2),  I agree with you. Schema inference does not
help either.



Guodong


On Fri, May 29, 2020 at 11:02 AM Benchao Li <li...@gmail.com> wrote:

> Hi Guodong,
>
> After an offline discussion with Leonard. I think you get the right
> meaning of schema inference.
> But there are two problems here:
> 1. schema of the data is fixed, schema inference can save your effort to
> write the schema explicitly.
> 2. schema of the data is dynamic, in this case the schema inference cannot
> help. Because SQL is somewhat static language, which should know all the
> data types at compile stage.
>
> Maybe I've misunderstood your question at the very beginning. I thought
> your case is #2. If your case is #1, then schema inference is a good
> choice.
>
> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午11:39写道:
>
>> Yes. Setting the value type as raw is one possible approach. And I would
>> like to vote for schema inference as well.
>>
>> Correct me if I am wrong, IMO schema inference means I can provide a
>> method in the table source to infer the data schema base on the runtime
>> computation. Just like some calcite adaptor does. Right?
>> For SQL table registration, I think that requiring the table source to
>> provide a static schema might be too strict. Let planner to infer the table
>> schema will be more flexible.
>>
>> Thank you for your suggestions.
>>
>> Guodong
>>
>>
>> On Thu, May 28, 2020 at 11:11 PM Benchao Li <li...@gmail.com> wrote:
>>
>>> Hi Guodong,
>>>
>>> Does the RAW type meet your requirements? For example, you can specify
>>> map<varchar, raw> type, and the value for the map is the raw JsonNode
>>> parsed from Jackson.
>>> This is not supported yet, however IMO this could be supported.
>>>
>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午9:43写道:
>>>
>>>> Benchao,
>>>>
>>>> Thank you for your quick reply.
>>>>
>>>> As you mentioned, for current scenario, approach 2 should work for me.
>>>> But it is a little bit annoying that I have to modify schema to add new
>>>> field types when upstream app changes the json format or adds new fields.
>>>> Otherwise, my user can not refer the field in their SQL.
>>>>
>>>> Per description in the jira, I think after implementing this, all the
>>>> json values will be converted as strings.
>>>> I am wondering if Flink SQL can/will support the flexible schema in the
>>>> future, for example, register the table without defining specific schema
>>>> for each field, to let user define a generic map or array for one field.
>>>> but the value of map/array can be any object. Then, the type conversion
>>>> cost might be saved.
>>>>
>>>> Guodong
>>>>
>>>>
>>>> On Thu, May 28, 2020 at 7:43 PM Benchao Li <li...@gmail.com> wrote:
>>>>
>>>>> Hi Guodong,
>>>>>
>>>>> I think you almost get the answer,
>>>>> 1. map type, it's not working for current implementation. For example,
>>>>> use map<varchar, varchar>, if the value if non-string json object, then
>>>>> `JsonNode.asText()` may not work as you wish.
>>>>> 2. list all fields you cares. IMO, this can fit your scenario. And you
>>>>> can set format.fail-on-missing-field = true, to allow setting non-existed
>>>>> fields to be null.
>>>>>
>>>>> For 1, I think maybe we can support it in the future, and I've created
>>>>> jira[1] to track this.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-18002
>>>>>
>>>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午6:32写道:
>>>>>
>>>>>> Hi !
>>>>>>
>>>>>> I want to use Flink SQL to process some json events. It is quite
>>>>>> challenging to define a schema for the Flink SQL table.
>>>>>>
>>>>>> My data source's format is some json like this
>>>>>> {
>>>>>> "top_level_key1": "some value",
>>>>>> "nested_object": {
>>>>>> "nested_key1": "abc",
>>>>>> "nested_key2": 123,
>>>>>> "nested_key3": ["element1", "element2", "element3"]
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> The big challenges for me to define a schema for the data source are
>>>>>> 1. the keys in nested_object are flexible, there might be 3 unique
>>>>>> keys or more unique keys. If I enumerate all the keys in the schema, I
>>>>>> think my code is fragile, how to handle event which contains more
>>>>>> nested_keys in nested_object ?
>>>>>> 2. I know table api support Map type, but I am not sure if I can put
>>>>>> generic object as the value of the map. Because the values in nested_object
>>>>>> are of different types, some of them are int, some of them are string or
>>>>>> array.
>>>>>>
>>>>>> So. how to expose this kind of json data as table in Flink SQL
>>>>>> without enumerating all the nested_keys?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Guodong
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best,
>>>>> Benchao Li
>>>>>
>>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>

Re: How to create schema for flexible json data in Flink SQL

Posted by Benchao Li <li...@gmail.com>.
Hi Guodong,

After an offline discussion with Leonard. I think you get the right meaning
of schema inference.
But there are two problems here:
1. schema of the data is fixed, schema inference can save your effort to
write the schema explicitly.
2. schema of the data is dynamic, in this case the schema inference cannot
help. Because SQL is somewhat static language, which should know all the
data types at compile stage.

Maybe I've misunderstood your question at the very beginning. I thought
your case is #2. If your case is #1, then schema inference is a good
choice.

Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午11:39写道:

> Yes. Setting the value type as raw is one possible approach. And I would
> like to vote for schema inference as well.
>
> Correct me if I am wrong, IMO schema inference means I can provide a
> method in the table source to infer the data schema base on the runtime
> computation. Just like some calcite adaptor does. Right?
> For SQL table registration, I think that requiring the table source to
> provide a static schema might be too strict. Let planner to infer the table
> schema will be more flexible.
>
> Thank you for your suggestions.
>
> Guodong
>
>
> On Thu, May 28, 2020 at 11:11 PM Benchao Li <li...@gmail.com> wrote:
>
>> Hi Guodong,
>>
>> Does the RAW type meet your requirements? For example, you can specify
>> map<varchar, raw> type, and the value for the map is the raw JsonNode
>> parsed from Jackson.
>> This is not supported yet, however IMO this could be supported.
>>
>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午9:43写道:
>>
>>> Benchao,
>>>
>>> Thank you for your quick reply.
>>>
>>> As you mentioned, for current scenario, approach 2 should work for me.
>>> But it is a little bit annoying that I have to modify schema to add new
>>> field types when upstream app changes the json format or adds new fields.
>>> Otherwise, my user can not refer the field in their SQL.
>>>
>>> Per description in the jira, I think after implementing this, all the
>>> json values will be converted as strings.
>>> I am wondering if Flink SQL can/will support the flexible schema in the
>>> future, for example, register the table without defining specific schema
>>> for each field, to let user define a generic map or array for one field.
>>> but the value of map/array can be any object. Then, the type conversion
>>> cost might be saved.
>>>
>>> Guodong
>>>
>>>
>>> On Thu, May 28, 2020 at 7:43 PM Benchao Li <li...@gmail.com> wrote:
>>>
>>>> Hi Guodong,
>>>>
>>>> I think you almost get the answer,
>>>> 1. map type, it's not working for current implementation. For example,
>>>> use map<varchar, varchar>, if the value if non-string json object, then
>>>> `JsonNode.asText()` may not work as you wish.
>>>> 2. list all fields you cares. IMO, this can fit your scenario. And you
>>>> can set format.fail-on-missing-field = true, to allow setting non-existed
>>>> fields to be null.
>>>>
>>>> For 1, I think maybe we can support it in the future, and I've created
>>>> jira[1] to track this.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-18002
>>>>
>>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午6:32写道:
>>>>
>>>>> Hi !
>>>>>
>>>>> I want to use Flink SQL to process some json events. It is quite
>>>>> challenging to define a schema for the Flink SQL table.
>>>>>
>>>>> My data source's format is some json like this
>>>>> {
>>>>> "top_level_key1": "some value",
>>>>> "nested_object": {
>>>>> "nested_key1": "abc",
>>>>> "nested_key2": 123,
>>>>> "nested_key3": ["element1", "element2", "element3"]
>>>>> }
>>>>> }
>>>>>
>>>>> The big challenges for me to define a schema for the data source are
>>>>> 1. the keys in nested_object are flexible, there might be 3 unique
>>>>> keys or more unique keys. If I enumerate all the keys in the schema, I
>>>>> think my code is fragile, how to handle event which contains more
>>>>> nested_keys in nested_object ?
>>>>> 2. I know table api support Map type, but I am not sure if I can put
>>>>> generic object as the value of the map. Because the values in nested_object
>>>>> are of different types, some of them are int, some of them are string or
>>>>> array.
>>>>>
>>>>> So. how to expose this kind of json data as table in Flink SQL without
>>>>> enumerating all the nested_keys?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Guodong
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Best,
>>>> Benchao Li
>>>>
>>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li

Re: How to create schema for flexible json data in Flink SQL

Posted by Guodong Wang <wa...@gmail.com>.
Yes. Setting the value type as raw is one possible approach. And I would
like to vote for schema inference as well.

Correct me if I am wrong, IMO schema inference means I can provide a method
in the table source to infer the data schema base on the runtime
computation. Just like some calcite adaptor does. Right?
For SQL table registration, I think that requiring the table source to
provide a static schema might be too strict. Let planner to infer the table
schema will be more flexible.

Thank you for your suggestions.

Guodong


On Thu, May 28, 2020 at 11:11 PM Benchao Li <li...@gmail.com> wrote:

> Hi Guodong,
>
> Does the RAW type meet your requirements? For example, you can specify
> map<varchar, raw> type, and the value for the map is the raw JsonNode
> parsed from Jackson.
> This is not supported yet, however IMO this could be supported.
>
> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午9:43写道:
>
>> Benchao,
>>
>> Thank you for your quick reply.
>>
>> As you mentioned, for current scenario, approach 2 should work for me.
>> But it is a little bit annoying that I have to modify schema to add new
>> field types when upstream app changes the json format or adds new fields.
>> Otherwise, my user can not refer the field in their SQL.
>>
>> Per description in the jira, I think after implementing this, all the
>> json values will be converted as strings.
>> I am wondering if Flink SQL can/will support the flexible schema in the
>> future, for example, register the table without defining specific schema
>> for each field, to let user define a generic map or array for one field.
>> but the value of map/array can be any object. Then, the type conversion
>> cost might be saved.
>>
>> Guodong
>>
>>
>> On Thu, May 28, 2020 at 7:43 PM Benchao Li <li...@gmail.com> wrote:
>>
>>> Hi Guodong,
>>>
>>> I think you almost get the answer,
>>> 1. map type, it's not working for current implementation. For example,
>>> use map<varchar, varchar>, if the value if non-string json object, then
>>> `JsonNode.asText()` may not work as you wish.
>>> 2. list all fields you cares. IMO, this can fit your scenario. And you
>>> can set format.fail-on-missing-field = true, to allow setting non-existed
>>> fields to be null.
>>>
>>> For 1, I think maybe we can support it in the future, and I've created
>>> jira[1] to track this.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-18002
>>>
>>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午6:32写道:
>>>
>>>> Hi !
>>>>
>>>> I want to use Flink SQL to process some json events. It is quite
>>>> challenging to define a schema for the Flink SQL table.
>>>>
>>>> My data source's format is some json like this
>>>> {
>>>> "top_level_key1": "some value",
>>>> "nested_object": {
>>>> "nested_key1": "abc",
>>>> "nested_key2": 123,
>>>> "nested_key3": ["element1", "element2", "element3"]
>>>> }
>>>> }
>>>>
>>>> The big challenges for me to define a schema for the data source are
>>>> 1. the keys in nested_object are flexible, there might be 3 unique keys
>>>> or more unique keys. If I enumerate all the keys in the schema, I think my
>>>> code is fragile, how to handle event which contains more  nested_keys in
>>>> nested_object ?
>>>> 2. I know table api support Map type, but I am not sure if I can put
>>>> generic object as the value of the map. Because the values in nested_object
>>>> are of different types, some of them are int, some of them are string or
>>>> array.
>>>>
>>>> So. how to expose this kind of json data as table in Flink SQL without
>>>> enumerating all the nested_keys?
>>>>
>>>> Thanks.
>>>>
>>>> Guodong
>>>>
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>

Re: How to create schema for flexible json data in Flink SQL

Posted by Benchao Li <li...@gmail.com>.
Hi Guodong,

Does the RAW type meet your requirements? For example, you can specify
map<varchar, raw> type, and the value for the map is the raw JsonNode
parsed from Jackson.
This is not supported yet, however IMO this could be supported.

Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午9:43写道:

> Benchao,
>
> Thank you for your quick reply.
>
> As you mentioned, for current scenario, approach 2 should work for me. But
> it is a little bit annoying that I have to modify schema to add new field
> types when upstream app changes the json format or adds new fields.
> Otherwise, my user can not refer the field in their SQL.
>
> Per description in the jira, I think after implementing this, all the json
> values will be converted as strings.
> I am wondering if Flink SQL can/will support the flexible schema in the
> future, for example, register the table without defining specific schema
> for each field, to let user define a generic map or array for one field.
> but the value of map/array can be any object. Then, the type conversion
> cost might be saved.
>
> Guodong
>
>
> On Thu, May 28, 2020 at 7:43 PM Benchao Li <li...@gmail.com> wrote:
>
>> Hi Guodong,
>>
>> I think you almost get the answer,
>> 1. map type, it's not working for current implementation. For example,
>> use map<varchar, varchar>, if the value if non-string json object, then
>> `JsonNode.asText()` may not work as you wish.
>> 2. list all fields you cares. IMO, this can fit your scenario. And you
>> can set format.fail-on-missing-field = true, to allow setting non-existed
>> fields to be null.
>>
>> For 1, I think maybe we can support it in the future, and I've created
>> jira[1] to track this.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18002
>>
>> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午6:32写道:
>>
>>> Hi !
>>>
>>> I want to use Flink SQL to process some json events. It is quite
>>> challenging to define a schema for the Flink SQL table.
>>>
>>> My data source's format is some json like this
>>> {
>>> "top_level_key1": "some value",
>>> "nested_object": {
>>> "nested_key1": "abc",
>>> "nested_key2": 123,
>>> "nested_key3": ["element1", "element2", "element3"]
>>> }
>>> }
>>>
>>> The big challenges for me to define a schema for the data source are
>>> 1. the keys in nested_object are flexible, there might be 3 unique keys
>>> or more unique keys. If I enumerate all the keys in the schema, I think my
>>> code is fragile, how to handle event which contains more  nested_keys in
>>> nested_object ?
>>> 2. I know table api support Map type, but I am not sure if I can put
>>> generic object as the value of the map. Because the values in nested_object
>>> are of different types, some of them are int, some of them are string or
>>> array.
>>>
>>> So. how to expose this kind of json data as table in Flink SQL without
>>> enumerating all the nested_keys?
>>>
>>> Thanks.
>>>
>>> Guodong
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li

Re: How to create schema for flexible json data in Flink SQL

Posted by Leonard Xu <xb...@gmail.com>.
Hi, guodong 
 
> I am wondering if Flink SQL can/will support the flexible schema in the future,

It’s an interesting topic, this feature is more close to the scope of schema inference.
The schema inference should come in next few releases. 

Best,
Leonard Xu




> for example, register the table without defining specific schema for each field, to let user define a generic map or array for one field. but the value of map/array can be any object. Then, the type conversion cost might be saved. 
> 
> Guodong
> 
> 
> On Thu, May 28, 2020 at 7:43 PM Benchao Li <libenchao@gmail.com <ma...@gmail.com>> wrote:
> Hi Guodong,
> 
> I think you almost get the answer,
> 1. map type, it's not working for current implementation. For example, use map<varchar, varchar>, if the value if non-string json object, then `JsonNode.asText()` may not work as you wish.
> 2. list all fields you cares. IMO, this can fit your scenario. And you can set format.fail-on-missing-field = true, to allow setting non-existed fields to be null.
> 
> For 1, I think maybe we can support it in the future, and I've created jira[1] to track this.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-18002 <https://issues.apache.org/jira/browse/FLINK-18002>
> Guodong Wang <wanggd04@gmail.com <ma...@gmail.com>> 于2020年5月28日周四 下午6:32写道:
> Hi !
> 
> I want to use Flink SQL to process some json events. It is quite challenging to define a schema for the Flink SQL table. 
> 
> My data source's format is some json like this
> {
>     "top_level_key1": "some value",
>     "nested_object": {
>         "nested_key1": "abc",
>         "nested_key2": 123,
>         "nested_key3": ["element1", "element2", "element3"]
>     }
> }
> 
> The big challenges for me to define a schema for the data source are
> 1. the keys in nested_object are flexible, there might be 3 unique keys or more unique keys. If I enumerate all the keys in the schema, I think my code is fragile, how to handle event which contains more  nested_keys in nested_object ?
> 2. I know table api support Map type, but I am not sure if I can put generic object as the value of the map. Because the values in nested_object are of different types, some of them are int, some of them are string or array.
> 
> So. how to expose this kind of json data as table in Flink SQL without enumerating all the nested_keys?
> 
> Thanks.
> 
> Guodong
> 
> 
> -- 
> 
> Best,
> Benchao Li


Re: How to create schema for flexible json data in Flink SQL

Posted by Guodong Wang <wa...@gmail.com>.
Benchao,

Thank you for your quick reply.

As you mentioned, for current scenario, approach 2 should work for me. But
it is a little bit annoying that I have to modify schema to add new field
types when upstream app changes the json format or adds new fields.
Otherwise, my user can not refer the field in their SQL.

Per description in the jira, I think after implementing this, all the json
values will be converted as strings.
I am wondering if Flink SQL can/will support the flexible schema in the
future, for example, register the table without defining specific schema
for each field, to let user define a generic map or array for one field.
but the value of map/array can be any object. Then, the type conversion
cost might be saved.

Guodong


On Thu, May 28, 2020 at 7:43 PM Benchao Li <li...@gmail.com> wrote:

> Hi Guodong,
>
> I think you almost get the answer,
> 1. map type, it's not working for current implementation. For example, use
> map<varchar, varchar>, if the value if non-string json object, then
> `JsonNode.asText()` may not work as you wish.
> 2. list all fields you cares. IMO, this can fit your scenario. And you can
> set format.fail-on-missing-field = true, to allow setting non-existed
> fields to be null.
>
> For 1, I think maybe we can support it in the future, and I've created
> jira[1] to track this.
>
> [1] https://issues.apache.org/jira/browse/FLINK-18002
>
> Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午6:32写道:
>
>> Hi !
>>
>> I want to use Flink SQL to process some json events. It is quite
>> challenging to define a schema for the Flink SQL table.
>>
>> My data source's format is some json like this
>> {
>> "top_level_key1": "some value",
>> "nested_object": {
>> "nested_key1": "abc",
>> "nested_key2": 123,
>> "nested_key3": ["element1", "element2", "element3"]
>> }
>> }
>>
>> The big challenges for me to define a schema for the data source are
>> 1. the keys in nested_object are flexible, there might be 3 unique keys
>> or more unique keys. If I enumerate all the keys in the schema, I think my
>> code is fragile, how to handle event which contains more  nested_keys in
>> nested_object ?
>> 2. I know table api support Map type, but I am not sure if I can put
>> generic object as the value of the map. Because the values in nested_object
>> are of different types, some of them are int, some of them are string or
>> array.
>>
>> So. how to expose this kind of json data as table in Flink SQL without
>> enumerating all the nested_keys?
>>
>> Thanks.
>>
>> Guodong
>>
>
>
> --
>
> Best,
> Benchao Li
>

Re: How to create schema for flexible json data in Flink SQL

Posted by Benchao Li <li...@gmail.com>.
Hi Guodong,

I think you almost get the answer,
1. map type, it's not working for current implementation. For example, use
map<varchar, varchar>, if the value if non-string json object, then
`JsonNode.asText()` may not work as you wish.
2. list all fields you cares. IMO, this can fit your scenario. And you can
set format.fail-on-missing-field = true, to allow setting non-existed
fields to be null.

For 1, I think maybe we can support it in the future, and I've created
jira[1] to track this.

[1] https://issues.apache.org/jira/browse/FLINK-18002

Guodong Wang <wa...@gmail.com> 于2020年5月28日周四 下午6:32写道:

> Hi !
>
> I want to use Flink SQL to process some json events. It is quite
> challenging to define a schema for the Flink SQL table.
>
> My data source's format is some json like this
> {
> "top_level_key1": "some value",
> "nested_object": {
> "nested_key1": "abc",
> "nested_key2": 123,
> "nested_key3": ["element1", "element2", "element3"]
> }
> }
>
> The big challenges for me to define a schema for the data source are
> 1. the keys in nested_object are flexible, there might be 3 unique keys or
> more unique keys. If I enumerate all the keys in the schema, I think my
> code is fragile, how to handle event which contains more  nested_keys in
> nested_object ?
> 2. I know table api support Map type, but I am not sure if I can put
> generic object as the value of the map. Because the values in nested_object
> are of different types, some of them are int, some of them are string or
> array.
>
> So. how to expose this kind of json data as table in Flink SQL without
> enumerating all the nested_keys?
>
> Thanks.
>
> Guodong
>


-- 

Best,
Benchao Li