You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Georg Heiler <ge...@gmail.com> on 2020/07/09 16:23:45 UTC

map JSON to scala case class & off-heap optimization

Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class.
How can this be accomplished using the JSONKeyValueDeserializationSchema?Is
a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are
discouraged. Instead, they offer a nice API (dataset API) to perform such a
type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be
faster

In Flink, instead, such off-heap optimizations seem not to be talked much
about (sorry if I miss something, I am a Flink newbie). Is there a reason
why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating
through all the attribute fields and parsing the keys from the object node
tree.

final case class Tweet(tweet_id: Option[String], text: Option[String],
source: Option[String], geo: Option[String], place: Option[String], lang:
Option[String], created_at: Option[String], timestamp_ms: Option[String],
coordinates: Option[String], user_id: Option[Long], user_name:
Option[String], screen_name: Option[String], user_created_at:
Option[String], followers_count: Option[Long], friends_count: Option[Long],
user_lang: Option[String], user_location: Option[String], hashtags:
Option[Seq[String]])

Best,
Georg

Re: map JSON to scala case class & off-heap optimization

Posted by Georg Heiler <ge...@gmail.com>.
Many thanks!

Am Mi., 15. Juli 2020 um 15:58 Uhr schrieb Aljoscha Krettek <
aljoscha@apache.org>:

> On 11.07.20 10:31, Georg Heiler wrote:
> > 1) similarly to spark the Table API works on some optimized binary
> > representation
> > 2) this is only available in the SQL way of interaction - there is no
> > programmatic API
>
> yes it's available from SQL, but also the Table API, which is a
> programmatic declarative API, similar to Spark's Structured Streaming.
>
>
> > q1) I have read somewhere (I think in some Flink Forward presentations)
> > that the SQL API is not necessarily stable with regards to state - even
> > with small changes to the DAG (due to optimization). So does this also
> > /still apply to the table API? (I assume yes)
>
> Yes, unfortunately this is correct. Because the Table API/SQL is
> declarative users don't have control over the DAG and the state that the
> operators have. Some work will happen on at least making sure that the
> optimizer stays stable between Flink versions or that we can let users
> pin a certain physical graph of a query so that it can be re-used across
> versions.
>
> > q2) When I use the DataSet/Stream (classical scala/java) API it looks
> like
> > I must create a custom serializer if I want to handle one/all of:
> >
> >    - side-output failing records and not simply crash the job
> >    - as asked before automatic serialization to a scala (case) class
>
> This is true, yes.
>
> > But I also read that creating the ObjectMapper (i.e. in Jackson terms)
> > inside the map function is not recommended. From Spark I know that there
> is
> > a map-partitions function, i.e. something where a database connection can
> > be created and then reused for the individua elements. Is a similar
> > construct available in Flink as well?
>
> Yes, for this you can use "rich functions", which have an open()/close()
> method that allows initializing and re-using resources across
> invocations:
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions
>
> > Also, I have read a lot of articles and it looks like a lot of people
> > are using the String serializer and then manually parse the JSON which
> also
> > seems inefficient.
> > Where would I find an example for some Serializer with side outputs for
> > failed records as well as efficient initialization using some similar
> > construct to map-partitions?
>
> I'm not aware of such examples, unfortunately.
>
> I hope that at least some answers will be helpful!
>
> Best,
> Aljoscha
>

Re: map JSON to scala case class & off-heap optimization

Posted by Aljoscha Krettek <al...@apache.org>.
On 11.07.20 10:31, Georg Heiler wrote:
> 1) similarly to spark the Table API works on some optimized binary
> representation
> 2) this is only available in the SQL way of interaction - there is no
> programmatic API

yes it's available from SQL, but also the Table API, which is a 
programmatic declarative API, similar to Spark's Structured Streaming.


> q1) I have read somewhere (I think in some Flink Forward presentations)
> that the SQL API is not necessarily stable with regards to state - even
> with small changes to the DAG (due to optimization). So does this also
> /still apply to the table API? (I assume yes)

Yes, unfortunately this is correct. Because the Table API/SQL is 
declarative users don't have control over the DAG and the state that the 
operators have. Some work will happen on at least making sure that the 
optimizer stays stable between Flink versions or that we can let users 
pin a certain physical graph of a query so that it can be re-used across 
versions.

> q2) When I use the DataSet/Stream (classical scala/java) API it looks like
> I must create a custom serializer if I want to handle one/all of:
> 
>    - side-output failing records and not simply crash the job
>    - as asked before automatic serialization to a scala (case) class

This is true, yes.

> But I also read that creating the ObjectMapper (i.e. in Jackson terms)
> inside the map function is not recommended. From Spark I know that there is
> a map-partitions function, i.e. something where a database connection can
> be created and then reused for the individua elements. Is a similar
> construct available in Flink as well?

Yes, for this you can use "rich functions", which have an open()/close() 
method that allows initializing and re-using resources across 
invocations: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions

> Also, I have read a lot of articles and it looks like a lot of people
> are using the String serializer and then manually parse the JSON which also
> seems inefficient.
> Where would I find an example for some Serializer with side outputs for
> failed records as well as efficient initialization using some similar
> construct to map-partitions?

I'm not aware of such examples, unfortunately.

I hope that at least some answers will be helpful!

Best,
Aljoscha

Re: map JSON to scala case class & off-heap optimization

Posted by Georg Heiler <ge...@gmail.com>.
Hi,


Many thanks.
So do I understand correctly that:

1) similarly to spark the Table API works on some optimized binary
representation
2) this is only available in the SQL way of interaction - there is no
programmatic API

This leads me then to some questions:

q1) I have read somewhere (I think in some Flink Forward presentations)
that the SQL API is not necessarily stable with regards to state - even
with small changes to the DAG (due to optimization). So does this also
/still apply to the table API? (I assume yes)
q2) When I use the DataSet/Stream (classical scala/java) API it looks like
I must create a custom serializer if I want to handle one/all of:

  - side-output failing records and not simply crash the job
  - as asked before automatic serialization to a scala (case) class

q3)
So as asked before:
>>> But I also read that creating the ObjectMapper (i.e. in Jackson terms)
inside the map function is not recommended. From Spark I know that there is
a map-partitions function, i.e. something where a database connection can
be created and then reused for the individua elements. Is a similar
construct available in Flink as well?
>>> Also, I have read a lot of articles and it looks like a lot of people
are using the String serializer and then manually parse the JSON which also
seems inefficient.
Where would I find an example for some Serializer with side outputs for
failed records as well as efficient initialization using some similar
construct to map-partitions?

Best,
Georg

Am Fr., 10. Juli 2020 um 16:22 Uhr schrieb Aljoscha Krettek <
aljoscha@apache.org>:

> Hi Georg,
>
> I'm afraid the other suggestions are missing the point a bit. From your
> other emails it seems you want to use Kafka with JSON records together
> with the Table API/SQL. For that, take a look at [1] which describes how
> to define data sources for the Table API. Especially the Kafka and JSON
> sections should be relevant.
>
> That first link I mentioned is for the legacy connector API. There is a
> newer API with slightly different properties which will allow us to do
> the kinds of optimization like working on binary data throughout the
> stack: [2]. Unfortunately, there is no programmatic API yet, you would
> have to use `TableEnvironment.executeSql()` to execute SQL DDL that
> defines your sources. There is a FLIP for adding the programmatic API: [3]
>
> Best,
> Aljoscha
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
>
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
>
> On 10.07.20 05:01, Aaron Levin wrote:
> > Hi Georg, you can try using the circe library for this which has a way to
> > automatically generate JSON decoders for scala case classes.
> >
> > As it was mentioned earlier, Flink does not come packaged with
> > JSON-decoding generators for Scala like spark does.
> >
> > On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <ge...@gmail.com>
> > wrote:
> >
> >> Great. Thanks.
> >> But would it be possible to automate this i.e. to have this work
> >> automatically for the case class / product?
> >>
> >> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
> >> taherk77@gmail.com>:
> >>
> >>> The performant way would be to apply a map function over the stream and
> >>> then use the Jackson ObjectMapper to convert to scala objects. In flink
> >>> there is no API like Spark to automatically get all fields.
> >>>
> >>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <ge...@gmail.com>
> >>> wrote:
> >>>
> >>>> How can I use it with a scala case class?
> >>>> If I understand it correctly for better performance the Object Mapper
> is
> >>>> already initialized in each KafkaConsumer and returning ObjectNodes.
> So
> >>>> probably I should rephrase to: how can I then map these to case
> classes
> >>>> without handcoding it?  https://github.com/json4s/json4s or
> >>>> https://github.com/FasterXML/jackson-module-scala both only seem to
> >>>> consume strings.
> >>>>
> >>>> Best,
> >>>> Georg
> >>>>
> >>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
> >>>> taherk77@gmail.com>:
> >>>>
> >>>>> You can try the Jackson ObjectMapper library and that will get you
> from
> >>>>> json to object.
> >>>>>
> >>>>> Regards,
> >>>>> Taher Koitawala
> >>>>>
> >>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.heiler@gmail.com
> >
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I want to map a stream of JSON documents from Kafka to a scala
> >>>>>> case-class. How can this be accomplished using the
> >>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object
> nodes
> >>>>>> required?
> >>>>>>
> >>>>>> I have a Spark background. There, such manual mappings usually are
> >>>>>> discouraged. Instead, they offer a nice API (dataset API) to
> perform such a
> >>>>>> type of assignment.
> >>>>>> 1) this is concise
> >>>>>> 2) it operates on sparks off-heap memory representations (tungsten)
> to
> >>>>>> be faster
> >>>>>>
> >>>>>> In Flink, instead, such off-heap optimizations seem not to be talked
> >>>>>> much about (sorry if I miss something, I am a Flink newbie). Is
> there a
> >>>>>> reason why these optimizations are not necessary in Flink?
> >>>>>>
> >>>>>>
> >>>>>> How could I get the following example:
> >>>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
> >>>>>> val stream = senv.addSource(
> >>>>>>      new FlinkKafkaConsumer(
> >>>>>>        "tweets-raw-json",
> >>>>>>        serializer,
> >>>>>>        properties
> >>>>>>      ).setStartFromEarliest() // TODO experiment with different
> start
> >>>>>> values
> >>>>>>    )
> >>>>>>
> >>>>>> to map to this Tweet class concisely, i.e. without manually
> iterating
> >>>>>> through all the attribute fields and parsing the keys from the
> object node
> >>>>>> tree.
> >>>>>>
> >>>>>> final case class Tweet(tweet_id: Option[String], text:
> Option[String],
> >>>>>> source: Option[String], geo: Option[String], place: Option[String],
> lang:
> >>>>>> Option[String], created_at: Option[String], timestamp_ms:
> Option[String],
> >>>>>> coordinates: Option[String], user_id: Option[Long], user_name:
> >>>>>> Option[String], screen_name: Option[String], user_created_at:
> >>>>>> Option[String], followers_count: Option[Long], friends_count:
> Option[Long],
> >>>>>> user_lang: Option[String], user_location: Option[String], hashtags:
> >>>>>> Option[Seq[String]])
> >>>>>>
> >>>>>> Best,
> >>>>>> Georg
> >>>>>>
> >>>>>
> >
>
>

Re: map JSON to scala case class & off-heap optimization

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Georg,

I'm afraid the other suggestions are missing the point a bit. From your 
other emails it seems you want to use Kafka with JSON records together 
with the Table API/SQL. For that, take a look at [1] which describes how 
to define data sources for the Table API. Especially the Kafka and JSON 
sections should be relevant.

That first link I mentioned is for the legacy connector API. There is a 
newer API with slightly different properties which will allow us to do 
the kinds of optimization like working on binary data throughout the 
stack: [2]. Unfortunately, there is no programmatic API yet, you would 
have to use `TableEnvironment.executeSql()` to execute SQL DDL that 
defines your sources. There is a FLIP for adding the programmatic API: [3]

Best,
Aljoscha

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API

On 10.07.20 05:01, Aaron Levin wrote:
> Hi Georg, you can try using the circe library for this which has a way to
> automatically generate JSON decoders for scala case classes.
> 
> As it was mentioned earlier, Flink does not come packaged with
> JSON-decoding generators for Scala like spark does.
> 
> On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <ge...@gmail.com>
> wrote:
> 
>> Great. Thanks.
>> But would it be possible to automate this i.e. to have this work
>> automatically for the case class / product?
>>
>> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
>> taherk77@gmail.com>:
>>
>>> The performant way would be to apply a map function over the stream and
>>> then use the Jackson ObjectMapper to convert to scala objects. In flink
>>> there is no API like Spark to automatically get all fields.
>>>
>>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <ge...@gmail.com>
>>> wrote:
>>>
>>>> How can I use it with a scala case class?
>>>> If I understand it correctly for better performance the Object Mapper is
>>>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>>>> probably I should rephrase to: how can I then map these to case classes
>>>> without handcoding it?  https://github.com/json4s/json4s or
>>>> https://github.com/FasterXML/jackson-module-scala both only seem to
>>>> consume strings.
>>>>
>>>> Best,
>>>> Georg
>>>>
>>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>>>> taherk77@gmail.com>:
>>>>
>>>>> You can try the Jackson ObjectMapper library and that will get you from
>>>>> json to object.
>>>>>
>>>>> Regards,
>>>>> Taher Koitawala
>>>>>
>>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <ge...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I want to map a stream of JSON documents from Kafka to a scala
>>>>>> case-class. How can this be accomplished using the
>>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>>>>> required?
>>>>>>
>>>>>> I have a Spark background. There, such manual mappings usually are
>>>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>>>>> type of assignment.
>>>>>> 1) this is concise
>>>>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>>>>> be faster
>>>>>>
>>>>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>>>>> reason why these optimizations are not necessary in Flink?
>>>>>>
>>>>>>
>>>>>> How could I get the following example:
>>>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>>>>> val stream = senv.addSource(
>>>>>>      new FlinkKafkaConsumer(
>>>>>>        "tweets-raw-json",
>>>>>>        serializer,
>>>>>>        properties
>>>>>>      ).setStartFromEarliest() // TODO experiment with different start
>>>>>> values
>>>>>>    )
>>>>>>
>>>>>> to map to this Tweet class concisely, i.e. without manually iterating
>>>>>> through all the attribute fields and parsing the keys from the object node
>>>>>> tree.
>>>>>>
>>>>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>>>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>>>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>>>>> Option[String], screen_name: Option[String], user_created_at:
>>>>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>>>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>>>>> Option[Seq[String]])
>>>>>>
>>>>>> Best,
>>>>>> Georg
>>>>>>
>>>>>
> 


Re: map JSON to scala case class & off-heap optimization

Posted by Aaron Levin <aa...@stripe.com>.
Hi Georg, you can try using the circe library for this which has a way to
automatically generate JSON decoders for scala case classes.

As it was mentioned earlier, Flink does not come packaged with
JSON-decoding generators for Scala like spark does.

On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <ge...@gmail.com>
wrote:

> Great. Thanks.
> But would it be possible to automate this i.e. to have this work
> automatically for the case class / product?
>
> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
> taherk77@gmail.com>:
>
>> The performant way would be to apply a map function over the stream and
>> then use the Jackson ObjectMapper to convert to scala objects. In flink
>> there is no API like Spark to automatically get all fields.
>>
>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <ge...@gmail.com>
>> wrote:
>>
>>> How can I use it with a scala case class?
>>> If I understand it correctly for better performance the Object Mapper is
>>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>>> probably I should rephrase to: how can I then map these to case classes
>>> without handcoding it?  https://github.com/json4s/json4s or
>>> https://github.com/FasterXML/jackson-module-scala both only seem to
>>> consume strings.
>>>
>>> Best,
>>> Georg
>>>
>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>>> taherk77@gmail.com>:
>>>
>>>> You can try the Jackson ObjectMapper library and that will get you from
>>>> json to object.
>>>>
>>>> Regards,
>>>> Taher Koitawala
>>>>
>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <ge...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I want to map a stream of JSON documents from Kafka to a scala
>>>>> case-class. How can this be accomplished using the
>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>>>> required?
>>>>>
>>>>> I have a Spark background. There, such manual mappings usually are
>>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>>>> type of assignment.
>>>>> 1) this is concise
>>>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>>>> be faster
>>>>>
>>>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>>>> reason why these optimizations are not necessary in Flink?
>>>>>
>>>>>
>>>>> How could I get the following example:
>>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>>>> val stream = senv.addSource(
>>>>>     new FlinkKafkaConsumer(
>>>>>       "tweets-raw-json",
>>>>>       serializer,
>>>>>       properties
>>>>>     ).setStartFromEarliest() // TODO experiment with different start
>>>>> values
>>>>>   )
>>>>>
>>>>> to map to this Tweet class concisely, i.e. without manually iterating
>>>>> through all the attribute fields and parsing the keys from the object node
>>>>> tree.
>>>>>
>>>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>>>> Option[String], screen_name: Option[String], user_created_at:
>>>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>>>> Option[Seq[String]])
>>>>>
>>>>> Best,
>>>>> Georg
>>>>>
>>>>

Re: map JSON to scala case class & off-heap optimization

Posted by Georg Heiler <ge...@gmail.com>.
Great. Thanks.
But would it be possible to automate this i.e. to have this work
automatically for the case class / product?

Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
taherk77@gmail.com>:

> The performant way would be to apply a map function over the stream and
> then use the Jackson ObjectMapper to convert to scala objects. In flink
> there is no API like Spark to automatically get all fields.
>
> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <ge...@gmail.com>
> wrote:
>
>> How can I use it with a scala case class?
>> If I understand it correctly for better performance the Object Mapper is
>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>> probably I should rephrase to: how can I then map these to case classes
>> without handcoding it?  https://github.com/json4s/json4s or
>> https://github.com/FasterXML/jackson-module-scala both only seem to
>> consume strings.
>>
>> Best,
>> Georg
>>
>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>> taherk77@gmail.com>:
>>
>>> You can try the Jackson ObjectMapper library and that will get you from
>>> json to object.
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <ge...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to map a stream of JSON documents from Kafka to a scala
>>>> case-class. How can this be accomplished using the
>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>>> required?
>>>>
>>>> I have a Spark background. There, such manual mappings usually are
>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>>> type of assignment.
>>>> 1) this is concise
>>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>>> be faster
>>>>
>>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>>> reason why these optimizations are not necessary in Flink?
>>>>
>>>>
>>>> How could I get the following example:
>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>>> val stream = senv.addSource(
>>>>     new FlinkKafkaConsumer(
>>>>       "tweets-raw-json",
>>>>       serializer,
>>>>       properties
>>>>     ).setStartFromEarliest() // TODO experiment with different start
>>>> values
>>>>   )
>>>>
>>>> to map to this Tweet class concisely, i.e. without manually iterating
>>>> through all the attribute fields and parsing the keys from the object node
>>>> tree.
>>>>
>>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>>> Option[String], screen_name: Option[String], user_created_at:
>>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>>> Option[Seq[String]])
>>>>
>>>> Best,
>>>> Georg
>>>>
>>>

Re: map JSON to scala case class & off-heap optimization

Posted by Taher Koitawala <ta...@gmail.com>.
The performant way would be to apply a map function over the stream and
then use the Jackson ObjectMapper to convert to scala objects. In flink
there is no API like Spark to automatically get all fields.

On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <ge...@gmail.com>
wrote:

> How can I use it with a scala case class?
> If I understand it correctly for better performance the Object Mapper is
> already initialized in each KafkaConsumer and returning ObjectNodes. So
> probably I should rephrase to: how can I then map these to case classes
> without handcoding it?  https://github.com/json4s/json4s or
> https://github.com/FasterXML/jackson-module-scala both only seem to
> consume strings.
>
> Best,
> Georg
>
> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
> taherk77@gmail.com>:
>
>> You can try the Jackson ObjectMapper library and that will get you from
>> json to object.
>>
>> Regards,
>> Taher Koitawala
>>
>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <ge...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I want to map a stream of JSON documents from Kafka to a scala
>>> case-class. How can this be accomplished using the
>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>> required?
>>>
>>> I have a Spark background. There, such manual mappings usually are
>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>> type of assignment.
>>> 1) this is concise
>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>> be faster
>>>
>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>> reason why these optimizations are not necessary in Flink?
>>>
>>>
>>> How could I get the following example:
>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>> val stream = senv.addSource(
>>>     new FlinkKafkaConsumer(
>>>       "tweets-raw-json",
>>>       serializer,
>>>       properties
>>>     ).setStartFromEarliest() // TODO experiment with different start
>>> values
>>>   )
>>>
>>> to map to this Tweet class concisely, i.e. without manually iterating
>>> through all the attribute fields and parsing the keys from the object node
>>> tree.
>>>
>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>> Option[String], screen_name: Option[String], user_created_at:
>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>> Option[Seq[String]])
>>>
>>> Best,
>>> Georg
>>>
>>

Re: map JSON to scala case class & off-heap optimization

Posted by Georg Heiler <ge...@gmail.com>.
How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is
already initialized in each KafkaConsumer and returning ObjectNodes. So
probably I should rephrase to: how can I then map these to case classes
without handcoding it?  https://github.com/json4s/json4s or
https://github.com/FasterXML/jackson-module-scala both only seem to consume
strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
taherk77@gmail.com>:

> You can try the Jackson ObjectMapper library and that will get you from
> json to object.
>
> Regards,
> Taher Koitawala
>
> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <ge...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I want to map a stream of JSON documents from Kafka to a scala
>> case-class. How can this be accomplished using the
>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>> required?
>>
>> I have a Spark background. There, such manual mappings usually are
>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>> type of assignment.
>> 1) this is concise
>> 2) it operates on sparks off-heap memory representations (tungsten) to be
>> faster
>>
>> In Flink, instead, such off-heap optimizations seem not to be talked much
>> about (sorry if I miss something, I am a Flink newbie). Is there a reason
>> why these optimizations are not necessary in Flink?
>>
>>
>> How could I get the following example:
>> val serializer = new JSONKeyValueDeserializationSchema(false)
>> val stream = senv.addSource(
>>     new FlinkKafkaConsumer(
>>       "tweets-raw-json",
>>       serializer,
>>       properties
>>     ).setStartFromEarliest() // TODO experiment with different start
>> values
>>   )
>>
>> to map to this Tweet class concisely, i.e. without manually iterating
>> through all the attribute fields and parsing the keys from the object node
>> tree.
>>
>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>> source: Option[String], geo: Option[String], place: Option[String], lang:
>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>> coordinates: Option[String], user_id: Option[Long], user_name:
>> Option[String], screen_name: Option[String], user_created_at:
>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>> user_lang: Option[String], user_location: Option[String], hashtags:
>> Option[Seq[String]])
>>
>> Best,
>> Georg
>>
>

Re: map JSON to scala case class & off-heap optimization

Posted by Taher Koitawala <ta...@gmail.com>.
You can try the Jackson ObjectMapper library and that will get you from
json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <ge...@gmail.com> wrote:

> Hi,
>
> I want to map a stream of JSON documents from Kafka to a scala case-class.
> How can this be accomplished using the JSONKeyValueDeserializationSchema?Is
> a manual mapping of object nodes required?
>
> I have a Spark background. There, such manual mappings usually are
> discouraged. Instead, they offer a nice API (dataset API) to perform such a
> type of assignment.
> 1) this is concise
> 2) it operates on sparks off-heap memory representations (tungsten) to be
> faster
>
> In Flink, instead, such off-heap optimizations seem not to be talked much
> about (sorry if I miss something, I am a Flink newbie). Is there a reason
> why these optimizations are not necessary in Flink?
>
>
> How could I get the following example:
> val serializer = new JSONKeyValueDeserializationSchema(false)
> val stream = senv.addSource(
>     new FlinkKafkaConsumer(
>       "tweets-raw-json",
>       serializer,
>       properties
>     ).setStartFromEarliest() // TODO experiment with different start values
>   )
>
> to map to this Tweet class concisely, i.e. without manually iterating
> through all the attribute fields and parsing the keys from the object node
> tree.
>
> final case class Tweet(tweet_id: Option[String], text: Option[String],
> source: Option[String], geo: Option[String], place: Option[String], lang:
> Option[String], created_at: Option[String], timestamp_ms: Option[String],
> coordinates: Option[String], user_id: Option[Long], user_name:
> Option[String], screen_name: Option[String], user_created_at:
> Option[String], followers_count: Option[Long], friends_count: Option[Long],
> user_lang: Option[String], user_location: Option[String], hashtags:
> Option[Seq[String]])
>
> Best,
> Georg
>