You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrew Otto <ot...@wikimedia.org> on 2022/06/16 18:25:56 UTC

Flink, JSON, and JSONSchemas

At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in
Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd like
to provide library level integration between our 'Event Platform' JSON data
and Flink.  My main goal:

*No case classes or POJOs.  *The JSONSchemas should be enough.

I can actually do this pretty easily with the Table API. I can convert from
JSONSchema to a DataType, and then create a table with that DataType and
format('json').

I'd like to be able to do the same for the DataStream API.  From what I can
tell, to do this I should be using a Row
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html>
as the record type.  I can also convert from JSONSchema to
TypeInformation<Row> pretty easily, using the Types factory
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/Types.html>
.

While I can convert to and from
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/>
the Table API to DataStream<Row>, it seems directly using DataStream<Row>
of our JSON could be pretty useful, and would make it possible to use Flink
without instantiating a StreamTableEnvironment or requiring a 'table
planner'.  Also, to convert back up to the Table API from a
DataStream<Row>, I need the explicit TypeInformation<Row>, which I need to
manually construct.

Ah but, JsonRowDeserializationSchema
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.html>
is
deprecated. Okay, fine I can copy it into my code and modify it for my
purposes.  But even if I do, I'm confused about something else:

DeserializationSchema is not Table API specific (e.g. it can be used as the
value deserializer in KafkaSource).  Row is also not Table API specific
(although I know the main purpose is to bridge Table to DataStream API).
However, it seems that constructing a Source using DeserializationSchema is
not really that common?  KafkaSource uses it, but FileSource and
env.fromElements don't?  I'm trying to write integration tests for this
that use the DataStream API.

*tl;dr questions:*

*1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*

*2. How can I use a DeserializationSchema<Row> to get a DataStream<Row> or
even DataStreamSource<Row> in a unit test from either a file or
String[]/byte[] of serialized JSON?*

Thank you!

Re: Flink, JSON, and JSONSchemas

Posted by Andrew Otto <ot...@wikimedia.org>.
> > *1. Is there some easy way to use deserialized JSON in DataStream
without case classes or POJOs?*
> Could you explain what you expected? Do you mean you want to just
register a DataType that is able to bridge the received bytes to the POJO
objects.
No, heh, I don't want to have any POJO objects.  I don't want users to have
to write hardcoded java classes of our canonical JSONSchemas.  I want
someone to be able to use JSON data in Flink that we know conforms to a
JSONSchema with types that map cleanly to Java types (i.e. no random
additionalProperties and $refs) without hardcoding any
duplicate information about that data that can be retrieved via our other
internal API.  (We have an HTTP 'schema registry' for JSONSchemas.).

Row (and RowData) can do this;  I just want to use them easily with JSON in
a DataStream.

> *> 2. How can I use a DeserializationSchema<Row> to get a DataStream<Row>
or even DataStreamSource<Row> in a unit test from either a file or
String[]/byte[] of serialized JSON?*
> For DeserializationSchema<RowData>, you can refer to the Kafka
connector[2]. I think it should be similar to the
DeserializationSchema<Row>.

JsonRowDeserializationSchema is marked as deprecated and recommends to use
the Table API.  I can do that, but I feel like it is overkill for just
wanting to use DataStream<Row>.   I was trying to get away with starting
and ending with the Table API always, where I can easily use DataType and
RowData, but if I do some map transformations on the DataStream<Row> to
produce a new stream, I need an explicitly declared TypeInformation<Row> that
matches the new stream schema when converting back into the Table API. If I
need to have the output TypeInformation<Row> explicitly declared anyway, I
might as well just start with TypeInformation<Row> in the first place, and
stay in DataStream world the whole time.


FWIW, I think I've been able to accomplish what I was trying to do in this
patch <https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/806319>.
Still needs some testing, but I've written my own JSONSchema ->
TypeInformation<Row> converter, and have copy/pasted and modified Flink's
deprecated JsonRowDeserializaitonSchema into our code.


Thank you for your responses!
-Andrew Otto
 Wikimedia Foundation



On Fri, Jun 17, 2022 at 12:33 AM Shengkai Fang <fs...@gmail.com> wrote:

> Hi.
>
> > *1. Is there some easy way to use deserialized JSON in DataStream
> without case classes or POJOs?*
>
> Could you explain what you expected? Do you mean you want to just register
> a DataType that is able to bridge the received bytes to the POJO objects. I
> am not sure wether the current RAW type[1] in Flink Table is enough for
> you.
>
> *> 2. How can I use a DeserializationSchema<Row> to get a DataStream<Row>
> or even DataStreamSource<Row> in a unit test from either a file or
> String[]/byte[] of serialized JSON?*
>
> For DeserializationSchema<RowData>, you can refer to the Kafka
> connector[2]. I think it should be similar to the
> DeserializationSchema<Row>.
>
> Best,
> Shengkai
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L234
>
>
>
> Andrew Otto <ot...@wikimedia.org> 于2022年6月17日周五 02:26写道:
>
>> At the Wikimedia Foundation, we use JSON and JSONSchemas for our events
>> in Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd
>> like to provide library level integration between our 'Event Platform' JSON
>> data and Flink.  My main goal:
>>
>> *No case classes or POJOs.  *The JSONSchemas should be enough.
>>
>> I can actually do this pretty easily with the Table API. I can
>> convert from JSONSchema to a DataType, and then create a table with that
>> DataType and format('json').
>>
>> I'd like to be able to do the same for the DataStream API.  From what I
>> can tell, to do this I should be using a Row
>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html>
>> as the record type.  I can also convert from JSONSchema to
>> TypeInformation<Row> pretty easily, using the Types factory
>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/Types.html>
>> .
>>
>> While I can convert to and from
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/>
>> the Table API to DataStream<Row>, it seems directly using DataStream<Row>
>> of our JSON could be pretty useful, and would make it possible to use Flink
>> without instantiating a StreamTableEnvironment or requiring a 'table
>> planner'.  Also, to convert back up to the Table API from a
>> DataStream<Row>, I need the explicit TypeInformation<Row>, which I need to
>> manually construct.
>>
>> Ah but, JsonRowDeserializationSchema
>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.html> is
>> deprecated. Okay, fine I can copy it into my code and modify it for my
>> purposes.  But even if I do, I'm confused about something else:
>>
>> DeserializationSchema is not Table API specific (e.g. it can be used as
>> the value deserializer in KafkaSource).  Row is also not Table API specific
>> (although I know the main purpose is to bridge Table to DataStream API).
>> However, it seems that constructing a Source using DeserializationSchema is
>> not really that common?  KafkaSource uses it, but FileSource and
>> env.fromElements don't?  I'm trying to write integration tests for this
>> that use the DataStream API.
>>
>> *tl;dr questions:*
>>
>> *1. Is there some easy way to use deserialized JSON in DataStream without
>> case classes or POJOs?*
>>
>> *2. How can I use a DeserializationSchema<Row> to get a DataStream<Row>
>> or even DataStreamSource<Row> in a unit test from either a file or
>> String[]/byte[] of serialized JSON?*
>>
>> Thank you!
>>
>>
>>
>>
>>

Re: Flink, JSON, and JSONSchemas

Posted by Shengkai Fang <fs...@gmail.com>.
Hi.

> *1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*

Could you explain what you expected? Do you mean you want to just register
a DataType that is able to bridge the received bytes to the POJO objects. I
am not sure wether the current RAW type[1] in Flink Table is enough for
you.

*> 2. How can I use a DeserializationSchema<Row> to get a DataStream<Row>
or even DataStreamSource<Row> in a unit test from either a file or
String[]/byte[] of serialized JSON?*

For DeserializationSchema<RowData>, you can refer to the Kafka
connector[2]. I think it should be similar to the
DeserializationSchema<Row>.

Best,
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L234



Andrew Otto <ot...@wikimedia.org> 于2022年6月17日周五 02:26写道:

> At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in
> Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd like
> to provide library level integration between our 'Event Platform' JSON data
> and Flink.  My main goal:
>
> *No case classes or POJOs.  *The JSONSchemas should be enough.
>
> I can actually do this pretty easily with the Table API. I can
> convert from JSONSchema to a DataType, and then create a table with that
> DataType and format('json').
>
> I'd like to be able to do the same for the DataStream API.  From what I
> can tell, to do this I should be using a Row
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html>
> as the record type.  I can also convert from JSONSchema to
> TypeInformation<Row> pretty easily, using the Types factory
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/Types.html>
> .
>
> While I can convert to and from
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/>
> the Table API to DataStream<Row>, it seems directly using DataStream<Row>
> of our JSON could be pretty useful, and would make it possible to use Flink
> without instantiating a StreamTableEnvironment or requiring a 'table
> planner'.  Also, to convert back up to the Table API from a
> DataStream<Row>, I need the explicit TypeInformation<Row>, which I need to
> manually construct.
>
> Ah but, JsonRowDeserializationSchema
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.html> is
> deprecated. Okay, fine I can copy it into my code and modify it for my
> purposes.  But even if I do, I'm confused about something else:
>
> DeserializationSchema is not Table API specific (e.g. it can be used as
> the value deserializer in KafkaSource).  Row is also not Table API specific
> (although I know the main purpose is to bridge Table to DataStream API).
> However, it seems that constructing a Source using DeserializationSchema is
> not really that common?  KafkaSource uses it, but FileSource and
> env.fromElements don't?  I'm trying to write integration tests for this
> that use the DataStream API.
>
> *tl;dr questions:*
>
> *1. Is there some easy way to use deserialized JSON in DataStream without
> case classes or POJOs?*
>
> *2. How can I use a DeserializationSchema<Row> to get a DataStream<Row> or
> even DataStreamSource<Row> in a unit test from either a file or
> String[]/byte[] of serialized JSON?*
>
> Thank you!
>
>
>
>
>