You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theodor Wübker <th...@inside-m2m.de> on 2022/11/08 13:21:05 UTC

Converting ResolvedSchema to JSON and Protobuf Schemas

Hello,

I have a streaming use case, where I execute a query on a Table. I take the ResolvedSchema of the table and convert it to an Avro-Schema using the AvroSchemaConverter. Now I want to do the same for JSON and Protobuf. However, it seems there is nothing similar to AvroSchemaConverter - I wonder if I have to code the Mapping of Flinks DataType to JSON and Protobuf myself now, or if I missed something. I would be glad if someone could point me in the right direction here.

Yours sincerely, 
Theo

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Theodor Wübker <th...@inside-m2m.de>.
Hey,

thank you for your reply. Your converter looks very interesting. However, Flink comes with the JsonRowSchemaConverter that converts a JSONSchema-String to a TypeInformation already. From there you can convert the TypeInformation to, say, a DataType (Although I must admit I only got this done using deprecated methods in Flink). I am struggling to get the reverse way done - converting from a Flink ResolvedSchema (or LogicalType, or DataType) to a JSONSchema. Is that something you want to implement in your converter as well?

Your project is encouraging me though, maybe I will try to implement DataType to JSONSchema and ProtobufSchema to DataType (and the reverse) myself, given I do not find anything that does the trick.

-Theo

> On 9. Nov 2022, at 14:46, Andrew Otto <ot...@wikimedia.org> wrote:
> 
> Hello! 
> 
> I see you are talking about JSONSchema, not just JSON itself.
> 
> We're trying to do a similar thing at Wikimedia and have developed some tooling around this.  
> 
> JsonSchemaFlinkConverter <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink Table DataType or Table SchemaBuilder, or Flink DataStream TypeInformation[Row].  Some of the conversions from JSONSchema to Flink type are opinionated.  You can see the mappings here <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>.
> 
> 
> 
> 
> 
> 
> 
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote:
> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to what you pointed out:
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
> I mentioned the ResolvedSchema because it is my starting point after the SQL operation. It seemed to me that I can not retrieve something that contains more schema information from the table so I got myself this. About your other answers: It seems the classes you mentioned can be used to serialize actual Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf schema and JSON schema (for Avro as you can see I have it already). It seems odd that this is not easily possible, because converting from a JSON schema to a Schema of Flink is possible using the JsonRowSchemaConverter. However the other way is not implemented it seems. This is how I got a Table Schema (that I can use in a table descriptor) from a JSON schema:
> 
> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
> Sidenote: I use deprecated methods here, so if there is a better approach please let me know! But it shows that in Flink its easily possible to create a Schema for a TableDescriptor from a JSON Schema - the other way is just not so trivial it seems. And for Protobuf so far I don’t have any solutions, not even creating a Flink Schema from a Protobuf Schema - not to mention the other way around.
> 
> -Theo
> 
> (resent because I accidentally only responded to you, not the Mailing list - sorry)
> 


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Andrew Otto <ot...@wikimedia.org>.
> meaning that double and integer
I meant to write: "meaning that double and bigint ... "
:)

On Tue, Nov 15, 2022 at 8:54 AM Andrew Otto <ot...@wikimedia.org> wrote:

> > Also thanks for showing me your pattern with the SchemaConversions and
> stuff. Feels pretty clean and worked like a charm :)
> Glad to hear it, that is very cool!
>
> > converts number to double always. I wonder, did you make this up?
> Yes, we chose the the mapping.  We chose to do number -> double and
> integer -> bigint because both of those are wider than their float/int
> counterparts, meaning that double and integer will work in more cases.  Of
> course, this is not an optimal usage of bits, but at least things won't
> break.
>
> > all kinds of fields like double, float, big decimal… they all get
> mapped to number by my converter
> It is possible to make some non-JSONSchema convention in the JSONSchema to
> map to more specific types.  This is done for example with format:
> date-time in our code, to map from a ISO-8601 string to a timestamp.  I
> just did a quick google to find some example of someone else already doing
> this and found this doc from IBM
> <https://www.ibm.com/docs/en/cics-ts/5.3?topic=mapping-json-schema-c-c> saying
> they use JSONSchema's format to specify a float, like
>
>   type: number
>   format: float
>
> This seems like a pretty good idea to me, and we should probably do this
> at WMF too!  However, it would be a custom convention, and not in the
> JSONSchema spec itself, so when you convert back to a JSONSchema, you'd
> have to codify this convention to do so (and nothing outside of your code
> would really respect it).
>
>
>
>
>
>
> On Tue, Nov 15, 2022 at 4:23 AM Theodor Wübker <th...@inside-m2m.de>
> wrote:
>
>> Yes, you are right. Schemas are not so nice in Json. When implementing
>> and testing my converter from DataType to JsonSchema I noticed that your
>> converter from JsonSchema to DataType converts number to double always. I
>> wonder, did you make this up? Because the table that specifies the
>> mapping
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/> only
>> does it for DataType -> JsonSchema.
>>
>> Its generally unfortunate that json schema only offers so little
>> possibility to specify type information… now when I have a Flink DataType
>> with all kinds of fields like double, float, big decimal… they all get
>> mapped to number by my converter - in return when I use yours they are all
>> mapped to a Flink Datatype double again. So I lose a lot of precision.
>>
>> I guess for my application it would in general be better to use Avro or
>> Protobuf, since they retain a lot more type information when you convert
>> them back and forth…
>> Also thanks for showing me your pattern with the SchemaConversions and
>> stuff. Feels pretty clean and worked like a charm :)
>>
>> -Theo
>>
>>
>> On 10. Nov 2022, at 15:02, Andrew Otto <ot...@wikimedia.org> wrote:
>>
>> >  I find it interesting that the Mapping from DataType to AvroSchema
>> does exist in Flink (see AvroSchemaConverter), but for all the other
>> formats there is no such Mapping,
>> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There
>> of course is JSONSchema, but it isn't a real java-y type system; it's just
>> more JSON for which there exist validators.
>>
>>
>>
>> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker <
>> theo.wuebker@inside-m2m.de> wrote:
>>
>>> Great, I will have a closer look at what you sent. Your idea seems very
>>> good, it would be a very clean solution to be able to plug in different
>>> SchemaConversions that a (Row) DataType can be mapped to. I will probably
>>> try to implement it like this. I find it interesting that the Mapping from
>>> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but
>>> for all the other formats there is no such Mapping. Maybe this would be
>>> something that would interest more people, so I when I am finished perhaps
>>> I can suggest putting the solution into the flink-json and flink-protobuf
>>> packages.
>>>
>>> -Theo
>>>
>>> On 9. Nov 2022, at 21:24, Andrew Otto <ot...@wikimedia.org> wrote:
>>>
>>> Interesting, yeah I think you'll have to implement code to recurse
>>> through the (Row) DataType and somehow auto generate the JSONSchema you
>>> want.
>>>
>>> We abstracted the conversions from JSONSchema to other type systems in
>>> this JsonSchemaConverter
>>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
>>> There's nothing special going on here, I've seen versions of this schema
>>> conversion code over and over again in different frameworks. This one just
>>> allows us to plug in a SchemaConversions
>>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java> implementation
>>> to provide the mappings to the output type system (like the Flink DataType
>>> mappings
>>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java> I
>>> linked to before), rather than hardcoding the output types.
>>>
>>> If I were trying to do what you are doing (in our codebase)...I'd create
>>> a Flink DataTypeConverter<T> that iterated through a (Row) DataType and a
>>> SchemaConversions<JsonNode> implementation that mapped to the JsonNode that
>>> represented the JSONSchema.  (If not using Jackson...then you could use
>>> another Java JSON object than JsonNode).
>>> You could also make a SchemaConversions<ProtobufSchema> (with whatever
>>> Protobuf class to use...I'm not familiar with Protobuf) and then use the
>>> same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd
>>> wonder if the input schema recursion code itself could be abstracted too so
>>> that it would work for either JsonSchema OR DataType OR whatever but anyway
>>> that is probably too crazy and too much for what you are doing...but it
>>> would be cool! :p
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker <
>>> theo.wuebker@inside-m2m.de> wrote:
>>>
>>>> I want to register the result-schema in a schema registry, as I am
>>>> pushing the result-data to a Kafka topic. The result-schema is not known at
>>>> compile-time, so I need to find a way to compute it at runtime from the
>>>> resulting Flink Schema.
>>>>
>>>> -Theo
>>>>
>>>> (resent - again sorry, I forgot to add the others in the cc)
>>>>
>>>> On 9. Nov 2022, at 14:59, Andrew Otto <ot...@wikimedia.org> wrote:
>>>>
>>>> >  I want to convert the schema of a Flink table to both Protobuf
>>>> *schema* and JSON *schema*
>>>> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
>>>> That would indeed be something that is not usually done.  Just curious, why
>>>> do you want to do this?
>>>>
>>>> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto <ot...@wikimedia.org> wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> I see you are talking about JSONSchema, not just JSON itself.
>>>>>
>>>>> We're trying to do a similar thing at Wikimedia and have developed
>>>>> some tooling around this.
>>>>>
>>>>> JsonSchemaFlinkConverter
>>>>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>>>>> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
>>>>> Table DataType or Table SchemaBuilder, or Flink DataStream
>>>>> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
>>>>> type are opinionated.  You can see the mappings here
>>>>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
>>>>> .
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <
>>>>> theo.wuebker@inside-m2m.de> wrote:
>>>>>
>>>>>> Thanks for your reply Yaroslav! The way I do it with Avro seems
>>>>>> similar to what you pointed out:
>>>>>>
>>>>>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>>>>>> DataType type = resultSchema.toSinkRowDataType();
>>>>>> org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>>>>>
>>>>>> I mentioned the ResolvedSchema because it is my starting point after
>>>>>> the SQL operation. It seemed to me that I can not retrieve something that
>>>>>> contains more schema information from the table so I got myself this. About
>>>>>> your other answers: It seems the classes you mentioned can be used to
>>>>>> serialize actual Data? However this is not quite what I want to do.
>>>>>> Essentially I want to convert the schema of a Flink table to both
>>>>>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have
>>>>>> it already). It seems odd that this is not easily possible, because
>>>>>> converting from a JSON schema to a Schema of Flink is possible using the
>>>>>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>>>>>> This is how I got a Table Schema (that I can use in a table descriptor)
>>>>>> from a JSON schema:
>>>>>>
>>>>>> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
>>>>>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>>>>>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>>>>>
>>>>>> Sidenote: I use deprecated methods here, so if there is a better
>>>>>> approach please let me know! But it shows that in Flink its easily possible
>>>>>> to create a Schema for a TableDescriptor from a JSON Schema - the other way
>>>>>> is just not so trivial it seems. And for Protobuf so far I don’t have any
>>>>>> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
>>>>>> mention the other way around.
>>>>>>
>>>>>> -Theo
>>>>>>
>>>>>> (resent because I accidentally only responded to you, not the Mailing
>>>>>> list - sorry)
>>>>>>
>>>>>>
>>>>
>>>
>>

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Andrew Otto <ot...@wikimedia.org>.
> Also thanks for showing me your pattern with the SchemaConversions and
stuff. Feels pretty clean and worked like a charm :)
Glad to hear it, that is very cool!

> converts number to double always. I wonder, did you make this up?
Yes, we chose the the mapping.  We chose to do number -> double and integer
-> bigint because both of those are wider than their float/int
counterparts, meaning that double and integer will work in more cases.  Of
course, this is not an optimal usage of bits, but at least things won't
break.

> all kinds of fields like double, float, big decimal… they all get mapped
to number by my converter
It is possible to make some non-JSONSchema convention in the JSONSchema to
map to more specific types.  This is done for example with format:
date-time in our code, to map from a ISO-8601 string to a timestamp.  I
just did a quick google to find some example of someone else already doing
this and found this doc from IBM
<https://www.ibm.com/docs/en/cics-ts/5.3?topic=mapping-json-schema-c-c> saying
they use JSONSchema's format to specify a float, like

  type: number
  format: float

This seems like a pretty good idea to me, and we should probably do this at
WMF too!  However, it would be a custom convention, and not in the
JSONSchema spec itself, so when you convert back to a JSONSchema, you'd
have to codify this convention to do so (and nothing outside of your code
would really respect it).






On Tue, Nov 15, 2022 at 4:23 AM Theodor Wübker <th...@inside-m2m.de>
wrote:

> Yes, you are right. Schemas are not so nice in Json. When implementing and
> testing my converter from DataType to JsonSchema I noticed that your
> converter from JsonSchema to DataType converts number to double always. I
> wonder, did you make this up? Because the table that specifies the mapping
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/> only
> does it for DataType -> JsonSchema.
>
> Its generally unfortunate that json schema only offers so little
> possibility to specify type information… now when I have a Flink DataType
> with all kinds of fields like double, float, big decimal… they all get
> mapped to number by my converter - in return when I use yours they are all
> mapped to a Flink Datatype double again. So I lose a lot of precision.
>
> I guess for my application it would in general be better to use Avro or
> Protobuf, since they retain a lot more type information when you convert
> them back and forth…
> Also thanks for showing me your pattern with the SchemaConversions and
> stuff. Feels pretty clean and worked like a charm :)
>
> -Theo
>
>
> On 10. Nov 2022, at 15:02, Andrew Otto <ot...@wikimedia.org> wrote:
>
> >  I find it interesting that the Mapping from DataType to AvroSchema
> does exist in Flink (see AvroSchemaConverter), but for all the other
> formats there is no such Mapping,
> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There
> of course is JSONSchema, but it isn't a real java-y type system; it's just
> more JSON for which there exist validators.
>
>
>
> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker <th...@inside-m2m.de>
> wrote:
>
>> Great, I will have a closer look at what you sent. Your idea seems very
>> good, it would be a very clean solution to be able to plug in different
>> SchemaConversions that a (Row) DataType can be mapped to. I will probably
>> try to implement it like this. I find it interesting that the Mapping from
>> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but
>> for all the other formats there is no such Mapping. Maybe this would be
>> something that would interest more people, so I when I am finished perhaps
>> I can suggest putting the solution into the flink-json and flink-protobuf
>> packages.
>>
>> -Theo
>>
>> On 9. Nov 2022, at 21:24, Andrew Otto <ot...@wikimedia.org> wrote:
>>
>> Interesting, yeah I think you'll have to implement code to recurse
>> through the (Row) DataType and somehow auto generate the JSONSchema you
>> want.
>>
>> We abstracted the conversions from JSONSchema to other type systems in
>> this JsonSchemaConverter
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
>> There's nothing special going on here, I've seen versions of this schema
>> conversion code over and over again in different frameworks. This one just
>> allows us to plug in a SchemaConversions
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java> implementation
>> to provide the mappings to the output type system (like the Flink DataType
>> mappings
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java> I
>> linked to before), rather than hardcoding the output types.
>>
>> If I were trying to do what you are doing (in our codebase)...I'd create
>> a Flink DataTypeConverter<T> that iterated through a (Row) DataType and a
>> SchemaConversions<JsonNode> implementation that mapped to the JsonNode that
>> represented the JSONSchema.  (If not using Jackson...then you could use
>> another Java JSON object than JsonNode).
>> You could also make a SchemaConversions<ProtobufSchema> (with whatever
>> Protobuf class to use...I'm not familiar with Protobuf) and then use the
>> same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd
>> wonder if the input schema recursion code itself could be abstracted too so
>> that it would work for either JsonSchema OR DataType OR whatever but anyway
>> that is probably too crazy and too much for what you are doing...but it
>> would be cool! :p
>>
>>
>>
>>
>>
>> On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker <th...@inside-m2m.de>
>> wrote:
>>
>>> I want to register the result-schema in a schema registry, as I am
>>> pushing the result-data to a Kafka topic. The result-schema is not known at
>>> compile-time, so I need to find a way to compute it at runtime from the
>>> resulting Flink Schema.
>>>
>>> -Theo
>>>
>>> (resent - again sorry, I forgot to add the others in the cc)
>>>
>>> On 9. Nov 2022, at 14:59, Andrew Otto <ot...@wikimedia.org> wrote:
>>>
>>> >  I want to convert the schema of a Flink table to both Protobuf
>>> *schema* and JSON *schema*
>>> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
>>> That would indeed be something that is not usually done.  Just curious, why
>>> do you want to do this?
>>>
>>> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto <ot...@wikimedia.org> wrote:
>>>
>>>> Hello!
>>>>
>>>> I see you are talking about JSONSchema, not just JSON itself.
>>>>
>>>> We're trying to do a similar thing at Wikimedia and have developed some
>>>> tooling around this.
>>>>
>>>> JsonSchemaFlinkConverter
>>>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>>>> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
>>>> Table DataType or Table SchemaBuilder, or Flink DataStream
>>>> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
>>>> type are opinionated.  You can see the mappings here
>>>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
>>>> .
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <
>>>> theo.wuebker@inside-m2m.de> wrote:
>>>>
>>>>> Thanks for your reply Yaroslav! The way I do it with Avro seems
>>>>> similar to what you pointed out:
>>>>>
>>>>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>>>>> DataType type = resultSchema.toSinkRowDataType();
>>>>> org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>>>>
>>>>> I mentioned the ResolvedSchema because it is my starting point after
>>>>> the SQL operation. It seemed to me that I can not retrieve something that
>>>>> contains more schema information from the table so I got myself this. About
>>>>> your other answers: It seems the classes you mentioned can be used to
>>>>> serialize actual Data? However this is not quite what I want to do.
>>>>> Essentially I want to convert the schema of a Flink table to both
>>>>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have
>>>>> it already). It seems odd that this is not easily possible, because
>>>>> converting from a JSON schema to a Schema of Flink is possible using the
>>>>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>>>>> This is how I got a Table Schema (that I can use in a table descriptor)
>>>>> from a JSON schema:
>>>>>
>>>>> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
>>>>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>>>>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>>>>
>>>>> Sidenote: I use deprecated methods here, so if there is a better
>>>>> approach please let me know! But it shows that in Flink its easily possible
>>>>> to create a Schema for a TableDescriptor from a JSON Schema - the other way
>>>>> is just not so trivial it seems. And for Protobuf so far I don’t have any
>>>>> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
>>>>> mention the other way around.
>>>>>
>>>>> -Theo
>>>>>
>>>>> (resent because I accidentally only responded to you, not the Mailing
>>>>> list - sorry)
>>>>>
>>>>>
>>>
>>
>

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Theodor Wübker <th...@inside-m2m.de>.
Yes, you are right. Schemas are not so nice in Json. When implementing and testing my converter from DataType to JsonSchema I noticed that your converter from JsonSchema to DataType converts number to double always. I wonder, did you make this up? Because the table that specifies the mapping <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/> only does it for DataType -> JsonSchema. 

Its generally unfortunate that json schema only offers so little possibility to specify type information… now when I have a Flink DataType with all kinds of fields like double, float, big decimal… they all get mapped to number by my converter - in return when I use yours they are all mapped to a Flink Datatype double again. So I lose a lot of precision.

I guess for my application it would in general be better to use Avro or Protobuf, since they retain a lot more type information when you convert them back and forth…
Also thanks for showing me your pattern with the SchemaConversions and stuff. Feels pretty clean and worked like a charm :)

-Theo


> On 10. Nov 2022, at 15:02, Andrew Otto <ot...@wikimedia.org> wrote:
> 
> >  I find it interesting that the Mapping from DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but for all the other formats there is no such Mapping, 
> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There of course is JSONSchema, but it isn't a real java-y type system; it's just more JSON for which there exist validators.  
> 
> 
> 
> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote:
> Great, I will have a closer look at what you sent. Your idea seems very good, it would be a very clean solution to be able to plug in different SchemaConversions that a (Row) DataType can be mapped to. I will probably try to implement it like this. I find it interesting that the Mapping from DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but for all the other formats there is no such Mapping. Maybe this would be something that would interest more people, so I when I am finished perhaps I can suggest putting the solution into the flink-json and flink-protobuf packages.
> 
> -Theo
> 
>> On 9. Nov 2022, at 21:24, Andrew Otto <otto@wikimedia.org <ma...@wikimedia.org>> wrote:
>> 
>> Interesting, yeah I think you'll have to implement code to recurse through the (Row) DataType and somehow auto generate the JSONSchema you want.  
>> 
>> We abstracted the conversions from JSONSchema to other type systems in this JsonSchemaConverter <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.  There's nothing special going on here, I've seen versions of this schema conversion code over and over again in different frameworks. This one just allows us to plug in a SchemaConversions <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java> implementation to provide the mappings to the output type system (like the Flink DataType mappings <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java> I linked to before), rather than hardcoding the output types.
>> 
>> If I were trying to do what you are doing (in our codebase)...I'd create a Flink DataTypeConverter<T> that iterated through a (Row) DataType and a SchemaConversions<JsonNode> implementation that mapped to the JsonNode that represented the JSONSchema.  (If not using Jackson...then you could use another Java JSON object than JsonNode).
>> You could also make a SchemaConversions<ProtobufSchema> (with whatever Protobuf class to use...I'm not familiar with Protobuf) and then use the same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd wonder if the input schema recursion code itself could be abstracted too so that it would work for either JsonSchema OR DataType OR whatever but anyway that is probably too crazy and too much for what you are doing...but it would be cool! :p
>> 
>> 
>> 
>> 
>> 
>> On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote:
>> I want to register the result-schema in a schema registry, as I am pushing the result-data to a Kafka topic. The result-schema is not known at compile-time, so I need to find a way to compute it at runtime from the resulting Flink Schema.
>> 
>> -Theo
>> 
>> (resent - again sorry, I forgot to add the others in the cc)
>> 
>>> On 9. Nov 2022, at 14:59, Andrew Otto <otto@wikimedia.org <ma...@wikimedia.org>> wrote:
>>> 
>>> >  I want to convert the schema of a Flink table to both Protobuf schema and JSON schema
>>> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.  That would indeed be something that is not usually done.  Just curious, why do you want to do this?
>>> 
>>> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto <otto@wikimedia.org <ma...@wikimedia.org>> wrote:
>>> Hello! 
>>> 
>>> I see you are talking about JSONSchema, not just JSON itself.
>>> 
>>> We're trying to do a similar thing at Wikimedia and have developed some tooling around this.  
>>> 
>>> JsonSchemaFlinkConverter <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink Table DataType or Table SchemaBuilder, or Flink DataStream TypeInformation[Row].  Some of the conversions from JSONSchema to Flink type are opinionated.  You can see the mappings here <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote:
>>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to what you pointed out:
>>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>>> DataType type = resultSchema.toSinkRowDataType();
>>> org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>> I mentioned the ResolvedSchema because it is my starting point after the SQL operation. It seemed to me that I can not retrieve something that contains more schema information from the table so I got myself this. About your other answers: It seems the classes you mentioned can be used to serialize actual Data? However this is not quite what I want to do.
>>> Essentially I want to convert the schema of a Flink table to both Protobuf schema and JSON schema (for Avro as you can see I have it already). It seems odd that this is not easily possible, because converting from a JSON schema to a Schema of Flink is possible using the JsonRowSchemaConverter. However the other way is not implemented it seems. This is how I got a Table Schema (that I can use in a table descriptor) from a JSON schema:
>>> 
>>> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
>>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>> Sidenote: I use deprecated methods here, so if there is a better approach please let me know! But it shows that in Flink its easily possible to create a Schema for a TableDescriptor from a JSON Schema - the other way is just not so trivial it seems. And for Protobuf so far I don’t have any solutions, not even creating a Flink Schema from a Protobuf Schema - not to mention the other way around.
>>> 
>>> -Theo
>>> 
>>> (resent because I accidentally only responded to you, not the Mailing list - sorry)
>>> 
>> 
> 


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Andrew Otto <ot...@wikimedia.org>.
Interesting, yeah I think you'll have to implement code to recurse through
the (Row) DataType and somehow auto generate the JSONSchema you want.

We abstracted the conversions from JSONSchema to other type systems in this
JsonSchemaConverter
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
There's nothing special going on here, I've seen versions of this schema
conversion code over and over again in different frameworks. This one just
allows us to plug in a SchemaConversions
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java>
implementation
to provide the mappings to the output type system (like the Flink DataType
mappings
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
I
linked to before), rather than hardcoding the output types.

If I were trying to do what you are doing (in our codebase)...I'd create a
Flink DataTypeConverter<T> that iterated through a (Row) DataType and a
SchemaConversions<JsonNode> implementation that mapped to the JsonNode that
represented the JSONSchema.  (If not using Jackson...then you could use
another Java JSON object than JsonNode).
You could also make a SchemaConversions<ProtobufSchema> (with whatever
Protobuf class to use...I'm not familiar with Protobuf) and then use the
same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd
wonder if the input schema recursion code itself could be abstracted too so
that it would work for either JsonSchema OR DataType OR whatever but anyway
that is probably too crazy and too much for what you are doing...but it
would be cool! :p





On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker <th...@inside-m2m.de>
wrote:

> I want to register the result-schema in a schema registry, as I am pushing
> the result-data to a Kafka topic. The result-schema is not known at
> compile-time, so I need to find a way to compute it at runtime from the
> resulting Flink Schema.
>
> -Theo
>
> (resent - again sorry, I forgot to add the others in the cc)
>
> On 9. Nov 2022, at 14:59, Andrew Otto <ot...@wikimedia.org> wrote:
>
> >  I want to convert the schema of a Flink table to both Protobuf *schema* and
> JSON *schema*
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
> That would indeed be something that is not usually done.  Just curious, why
> do you want to do this?
>
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto <ot...@wikimedia.org> wrote:
>
>> Hello!
>>
>> I see you are talking about JSONSchema, not just JSON itself.
>>
>> We're trying to do a similar thing at Wikimedia and have developed some
>> tooling around this.
>>
>> JsonSchemaFlinkConverter
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
>> Table DataType or Table SchemaBuilder, or Flink DataStream
>> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
>> type are opinionated.  You can see the mappings here
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
>> .
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <th...@inside-m2m.de>
>> wrote:
>>
>>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>>> to what you pointed out:
>>>
>>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>>> DataType type = resultSchema.toSinkRowDataType();
>>> org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>>
>>> I mentioned the ResolvedSchema because it is my starting point after the
>>> SQL operation. It seemed to me that I can not retrieve something that
>>> contains more schema information from the table so I got myself this. About
>>> your other answers: It seems the classes you mentioned can be used to
>>> serialize actual Data? However this is not quite what I want to do.
>>> Essentially I want to convert the schema of a Flink table to both
>>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it
>>> already). It seems odd that this is not easily possible, because converting
>>> from a JSON schema to a Schema of Flink is possible using the
>>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>>> This is how I got a Table Schema (that I can use in a table descriptor)
>>> from a JSON schema:
>>>
>>> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
>>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>>
>>> Sidenote: I use deprecated methods here, so if there is a better
>>> approach please let me know! But it shows that in Flink its easily possible
>>> to create a Schema for a TableDescriptor from a JSON Schema - the other way
>>> is just not so trivial it seems. And for Protobuf so far I don’t have any
>>> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
>>> mention the other way around.
>>>
>>> -Theo
>>>
>>> (resent because I accidentally only responded to you, not the Mailing
>>> list - sorry)
>>>
>>>
>

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Theodor Wübker <th...@inside-m2m.de>.
I want to register the result-schema in a schema registry, as I am pushing the result-data to a Kafka topic. The result-schema is not known at compile-time, so I need to find a way to compute it at runtime from the resulting Flink Schema.

-Theo

(resent - again sorry, I forgot to add the others in the cc)

> On 9. Nov 2022, at 14:59, Andrew Otto <ot...@wikimedia.org> wrote:
> 
> >  I want to convert the schema of a Flink table to both Protobuf schema and JSON schema
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.  That would indeed be something that is not usually done.  Just curious, why do you want to do this?
> 
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto <otto@wikimedia.org <ma...@wikimedia.org>> wrote:
> Hello! 
> 
> I see you are talking about JSONSchema, not just JSON itself.
> 
> We're trying to do a similar thing at Wikimedia and have developed some tooling around this.  
> 
> JsonSchemaFlinkConverter <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink Table DataType or Table SchemaBuilder, or Flink DataStream TypeInformation[Row].  Some of the conversions from JSONSchema to Flink type are opinionated.  You can see the mappings here <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>.
> 
> 
> 
> 
> 
> 
> 
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <theo.wuebker@inside-m2m.de <ma...@inside-m2m.de>> wrote:
> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to what you pointed out:
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
> I mentioned the ResolvedSchema because it is my starting point after the SQL operation. It seemed to me that I can not retrieve something that contains more schema information from the table so I got myself this. About your other answers: It seems the classes you mentioned can be used to serialize actual Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf schema and JSON schema (for Avro as you can see I have it already). It seems odd that this is not easily possible, because converting from a JSON schema to a Schema of Flink is possible using the JsonRowSchemaConverter. However the other way is not implemented it seems. This is how I got a Table Schema (that I can use in a table descriptor) from a JSON schema:
> 
> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
> Sidenote: I use deprecated methods here, so if there is a better approach please let me know! But it shows that in Flink its easily possible to create a Schema for a TableDescriptor from a JSON Schema - the other way is just not so trivial it seems. And for Protobuf so far I don’t have any solutions, not even creating a Flink Schema from a Protobuf Schema - not to mention the other way around.
> 
> -Theo
> 
> (resent because I accidentally only responded to you, not the Mailing list - sorry)
> 


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Andrew Otto <ot...@wikimedia.org>.
>  I want to convert the schema of a Flink table to both Protobuf *schema* and
JSON *schema*
Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
That would indeed be something that is not usually done.  Just curious, why
do you want to do this?

On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto <ot...@wikimedia.org> wrote:

> Hello!
>
> I see you are talking about JSONSchema, not just JSON itself.
>
> We're trying to do a similar thing at Wikimedia and have developed some
> tooling around this.
>
> JsonSchemaFlinkConverter
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
> Table DataType or Table SchemaBuilder, or Flink DataStream
> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
> type are opinionated.  You can see the mappings here
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
> .
>
>
>
>
>
>
>
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <th...@inside-m2m.de>
> wrote:
>
>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>> to what you pointed out:
>>
>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>> DataType type = resultSchema.toSinkRowDataType();
>> org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>
>> I mentioned the ResolvedSchema because it is my starting point after the
>> SQL operation. It seemed to me that I can not retrieve something that
>> contains more schema information from the table so I got myself this. About
>> your other answers: It seems the classes you mentioned can be used to
>> serialize actual Data? However this is not quite what I want to do.
>> Essentially I want to convert the schema of a Flink table to both
>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it
>> already). It seems odd that this is not easily possible, because converting
>> from a JSON schema to a Schema of Flink is possible using the
>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>> This is how I got a Table Schema (that I can use in a table descriptor)
>> from a JSON schema:
>>
>> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>
>> Sidenote: I use deprecated methods here, so if there is a better approach
>> please let me know! But it shows that in Flink its easily possible to
>> create a Schema for a TableDescriptor from a JSON Schema - the other way is
>> just not so trivial it seems. And for Protobuf so far I don’t have any
>> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
>> mention the other way around.
>>
>> -Theo
>>
>> (resent because I accidentally only responded to you, not the Mailing
>> list - sorry)
>>
>>

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Andrew Otto <ot...@wikimedia.org>.
Hello!

I see you are talking about JSONSchema, not just JSON itself.

We're trying to do a similar thing at Wikimedia and have developed some
tooling around this.

JsonSchemaFlinkConverter
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
Table DataType or Table SchemaBuilder, or Flink DataStream
TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
type are opinionated.  You can see the mappings here
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
.







On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <th...@inside-m2m.de>
wrote:

> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to
> what you pointed out:
>
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
>
> I mentioned the ResolvedSchema because it is my starting point after the
> SQL operation. It seemed to me that I can not retrieve something that
> contains more schema information from the table so I got myself this. About
> your other answers: It seems the classes you mentioned can be used to
> serialize actual Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf
> *schema* and JSON *schema* (for Avro as you can see I have it already).
> It seems odd that this is not easily possible, because converting from a
> JSON schema to a Schema of Flink is possible using the
> JsonRowSchemaConverter. However the other way is not implemented it seems.
> This is how I got a Table Schema (that I can use in a table descriptor)
> from a JSON schema:
>
> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>
> Sidenote: I use deprecated methods here, so if there is a better approach
> please let me know! But it shows that in Flink its easily possible to
> create a Schema for a TableDescriptor from a JSON Schema - the other way is
> just not so trivial it seems. And for Protobuf so far I don’t have any
> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
> mention the other way around.
>
> -Theo
>
> (resent because I accidentally only responded to you, not the Mailing list
> - sorry)
>
>

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Theodor Wübker <th...@inside-m2m.de>.
Thanks for your reply Yaroslav! The way I do it with Avro seems similar to what you pointed out:
ResolvedSchema resultSchema = resultTable.getResolvedSchema();
DataType type = resultSchema.toSinkRowDataType();
org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalType());
I mentioned the ResolvedSchema because it is my starting point after the SQL operation. It seemed to me that I can not retrieve something that contains more schema information from the table so I got myself this. About your other answers: It seems the classes you mentioned can be used to serialize actual Data? However this is not quite what I want to do.
Essentially I want to convert the schema of a Flink table to both Protobuf schema and JSON schema (for Avro as you can see I have it already). It seems odd that this is not easily possible, because converting from a JSON schema to a Schema of Flink is possible using the JsonRowSchemaConverter. However the other way is not implemented it seems. This is how I got a Table Schema (that I can use in a table descriptor) from a JSON schema:

TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
Schema schema = Schema.newBuilder().fromRowDataType(row).build();
Sidenote: I use deprecated methods here, so if there is a better approach please let me know! But it shows that in Flink its easily possible to create a Schema for a TableDescriptor from a JSON Schema - the other way is just not so trivial it seems. And for Protobuf so far I don’t have any solutions, not even creating a Flink Schema from a Protobuf Schema - not to mention the other way around.

-Theo

(resent because I accidentally only responded to you, not the Mailing list - sorry)


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Yaroslav Tkachenko <ya...@goldsky.com>.
Got it. I'm not sure why you mentioned ResolvedSchema in the first place,
usually in the Table API you work with RowType / RowData.

- For Avro I use AvroSchemaConverter.convertToSchema method to get an Avro
schema from a RowType, then
use org.apache.flink.formats.avro.RowDataToAvroConverters to serialize
RowData as Avro's GenericRecord.
- For JSON you can create
org.apache.flink.formats.json.JsonRowDataSerializationSchema that takes
RowType as a parameter, then serialize RowData as a JSON payload.
- I haven't used Protobuf with Flink, but it looks like Flink 1.16 added
quite a bit of support for Protobuf. It seems like you can similarly
create org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema
that takes RowType as a parameter, then serialize RowData as a Protobuf
payload.

On Tue, Nov 8, 2022 at 7:47 AM Theodor Wübker <th...@inside-m2m.de>
wrote:

> Hey,
>
> yes I have. I actually use these packages as well. However I am quite new
> to Flink and the Flink Type system in the Table API. Me searching these
> packages did not result in anything. My understanding of especially
> Protobuf is also not great, so its very possible I missed something
> (possibly a complex way of doing it). I am relatively certain though that
> there is nothing that does the trick as straightforward as there is for
> Avro with AvroSchemaConverter in the flink-avro package.
>
> -Theo
>
> On 8. Nov 2022, at 16:34, Yaroslav Tkachenko <ya...@goldsky.com> wrote:
>
> Hey Theo, have you looked at the flink-json and flink-protobuf packages?
>
> On Tue, Nov 8, 2022 at 5:21 AM Theodor Wübker <th...@inside-m2m.de>
> wrote:
>
>> Hello,
>>
>> I have a streaming use case, where I execute a query on a Table. I take
>> the ResolvedSchema of the table and convert it to an Avro-Schema using the
>> AvroSchemaConverter. Now I want to do the same for JSON and Protobuf.
>> However, it seems there is nothing similar to AvroSchemaConverter - I
>> wonder if I have to code the Mapping of Flinks DataType to JSON and
>> Protobuf myself now, or if I missed something. I would be glad if someone
>> could point me in the right direction here.
>>
>> Yours sincerely,
>> Theo
>
>
>

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

Posted by Yaroslav Tkachenko <ya...@goldsky.com>.
Hey Theo, have you looked at the flink-json and flink-protobuf packages?

On Tue, Nov 8, 2022 at 5:21 AM Theodor Wübker <th...@inside-m2m.de>
wrote:

> Hello,
>
> I have a streaming use case, where I execute a query on a Table. I take
> the ResolvedSchema of the table and convert it to an Avro-Schema using the
> AvroSchemaConverter. Now I want to do the same for JSON and Protobuf.
> However, it seems there is nothing similar to AvroSchemaConverter - I
> wonder if I have to code the Mapping of Flinks DataType to JSON and
> Protobuf myself now, or if I missed something. I would be glad if someone
> could point me in the right direction here.
>
> Yours sincerely,
> Theo