You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Ertl <pe...@gmx.net> on 2017/08/03 10:41:36 UTC

json mapper

Hi flink users,

I just wanted to ask if this kind of scala map function is correct?

object JsonMapper {
  private val mapper: ObjectMapper = new ObjectMapper()
}

class JsonMapper extends MapFunction[String, ObjectNode] {
  override def map(value: String): ObjectNode = JsonMapper.mapper.readValue(value, classOf[ObjectNode])
}

Is using a static reference to ObjectMapper fine or will this cause issues on a distributed cluster / with checkpoint / serializing state / whatever ?

Or should I instead use a non-transient property initialized in ctor (ObjectMapper is java.io.Serializable) ?

Or should I initialize it with RichMapFunction.open into a transient property?

Also I am wondering if replacing 'class' with 'object' (=> singleton)

object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }

is ok (actually the mapper is stateless so no obvious need to re-instantiate it again and again ? )

Thanks and best regards
Peter

Re: json mapper

Posted by Eron Wright <er...@gmail.com>.
I think your snippet looks good.  The Jackson ObjectMapper is designed to
be reused by numerous threads, and routinely stored as a static field.  It
is somewhat expensive to create.

Hope this helps,
-Eron

On Thu, Aug 3, 2017 at 7:46 AM, Nico Kruber <ni...@data-artisans.com> wrote:

> Hi Peter,
> I'm no Scala developer but I may be able to help with some concepts:
>
> * a static reference used inside a [Map]Function will certainly cause
> problems
> when executed in parallel in the same JVM, e.g. a TaskManager with multiple
> slots, depending on whether this static object is stateful and/or
> thread-safe
> * additionally, not all parallel instances of your map may be executed in
> the
> same JVM, e.g. on multiple TaskManagers, so you cannot assume that the
> state
> of the JsonMapper is consistent among them
> * if the ObjectMapper does not store any state that is worth recovering
> during
> a failure (none that I see from https://fasterxml.github.io/
> jackson-databind/
> javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is
> the
> one you are using), then you don't need to put it into flink state but can
> either initialise it as a (non-static) member of your MapFunction class or
> even in your map function itself
> * for the correct use of keyed/non-keyed state, please refer to my other
> email
> or [1]
> * for 'class' vs. 'object': if you're using
> com.fasterxml.jackson.databind.ObjectMapper as described above, you'll
> have
> state again ("It will use instances of JsonParser and JsonGenerator for
> implementing actual reading/writing of JSON. " from the docs) but in
> general,
> it is a good question whether the singleton would work for stateless
> operators
> and whether it actually improves performance.
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> state.html
>
> On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote:
> > Hi flink users,
> >
> > I just wanted to ask if this kind of scala map function is correct?
> >
> > object JsonMapper {
> >   private val mapper: ObjectMapper = new ObjectMapper()
> > }
> >
> > class JsonMapper extends MapFunction[String, ObjectNode] {
> >   override def map(value: String): ObjectNode =
> > JsonMapper.mapper.readValue(value, classOf[ObjectNode]) }
> >
> > Is using a static reference to ObjectMapper fine or will this cause
> issues
> > on a distributed cluster / with checkpoint / serializing state /
> whatever ?
> >
> > Or should I instead use a non-transient property initialized in ctor
> > (ObjectMapper is java.io.Serializable) ?
> >
> > Or should I initialize it with RichMapFunction.open into a transient
> > property?
> >
> > Also I am wondering if replacing 'class' with 'object' (=> singleton)
> >
> > object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }
> >
> > is ok (actually the mapper is stateless so no obvious need to
> re-instantiate
> > it again and again ? )
> >
> > Thanks and best regards
> > Peter
>
>

Re: json mapper

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Peter,
I'm no Scala developer but I may be able to help with some concepts:

* a static reference used inside a [Map]Function will certainly cause problems 
when executed in parallel in the same JVM, e.g. a TaskManager with multiple 
slots, depending on whether this static object is stateful and/or thread-safe
* additionally, not all parallel instances of your map may be executed in the 
same JVM, e.g. on multiple TaskManagers, so you cannot assume that the state 
of the JsonMapper is consistent among them
* if the ObjectMapper does not store any state that is worth recovering during 
a failure (none that I see from https://fasterxml.github.io/jackson-databind/
javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is the 
one you are using), then you don't need to put it into flink state but can 
either initialise it as a (non-static) member of your MapFunction class or 
even in your map function itself
* for the correct use of keyed/non-keyed state, please refer to my other email 
or [1]
* for 'class' vs. 'object': if you're using 
com.fasterxml.jackson.databind.ObjectMapper as described above, you'll have 
state again ("It will use instances of JsonParser and JsonGenerator for 
implementing actual reading/writing of JSON. " from the docs) but in general, 
it is a good question whether the singleton would work for stateless operators 
and whether it actually improves performance.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote:
> Hi flink users,
> 
> I just wanted to ask if this kind of scala map function is correct?
> 
> object JsonMapper {
>   private val mapper: ObjectMapper = new ObjectMapper()
> }
> 
> class JsonMapper extends MapFunction[String, ObjectNode] {
>   override def map(value: String): ObjectNode =
> JsonMapper.mapper.readValue(value, classOf[ObjectNode]) }
> 
> Is using a static reference to ObjectMapper fine or will this cause issues
> on a distributed cluster / with checkpoint / serializing state / whatever ?
> 
> Or should I instead use a non-transient property initialized in ctor
> (ObjectMapper is java.io.Serializable) ?
> 
> Or should I initialize it with RichMapFunction.open into a transient
> property?
> 
> Also I am wondering if replacing 'class' with 'object' (=> singleton)
> 
> object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }
> 
> is ok (actually the mapper is stateless so no obvious need to re-instantiate
> it again and again ? )
> 
> Thanks and best regards
> Peter