You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jack Huang <ja...@mz.com> on 2016/08/04 01:56:58 UTC

Parsing source JSON String as Scala Case Class

Hi all,

I want to read a source of JSON String as Scala Case Class. I don't want to
have to write a serde for every case class I have. The idea is:

val events = env.addSource(new FlinkKafkaConsumer09[Event]("events",
new JsonSerde(classOf[Event]), kafkaProp))

​

I was implementing my own JsonSerde with Jackson/Gson, but in both case I
get the error

Task not serializable
    org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
    com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)

​
It seems that both Jackson and Gson have classes that is not serializable.

I couldn't find any other solution to perform this JSON-to-Case-Class
parsing, yet it seems a very basic need. What am I missing?


Thanks,
Jack

Re: Parsing source JSON String as Scala Case Class

Posted by Jack Huang <ja...@mz.com>.
Thanks Stephan. "lazy val" does the trick.

On Thu, Aug 4, 2016 at 2:33 AM, Stephan Ewen <se...@apache.org> wrote:

> If the class has non-serializable members, you need to initialize them
> "lazily" when the objects are already in the distributed execution (after
> serializing / distributing them).
>
> Making a Scala 'val' a 'lazy val' often does the trick (at minimal
> performance cost).
>
> On Thu, Aug 4, 2016 at 3:56 AM, Jack Huang <ja...@mz.com> wrote:
>
>> Hi all,
>>
>> I want to read a source of JSON String as Scala Case Class. I don't want
>> to have to write a serde for every case class I have. The idea is:
>>
>> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event]), kafkaProp))
>>
>> ​
>>
>> I was implementing my own JsonSerde with Jackson/Gson, but in both case I
>> get the error
>>
>> Task not serializable
>>     org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>>     org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>>     org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>>     org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>>     com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)
>>
>> ​
>> It seems that both Jackson and Gson have classes that is not serializable.
>>
>> I couldn't find any other solution to perform this JSON-to-Case-Class
>> parsing, yet it seems a very basic need. What am I missing?
>>
>>
>> Thanks,
>> Jack
>>
>>
>>
>>
>

Re: Parsing source JSON String as Scala Case Class

Posted by Stephan Ewen <se...@apache.org>.
If the class has non-serializable members, you need to initialize them
"lazily" when the objects are already in the distributed execution (after
serializing / distributing them).

Making a Scala 'val' a 'lazy val' often does the trick (at minimal
performance cost).

On Thu, Aug 4, 2016 at 3:56 AM, Jack Huang <ja...@mz.com> wrote:

> Hi all,
>
> I want to read a source of JSON String as Scala Case Class. I don't want
> to have to write a serde for every case class I have. The idea is:
>
> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event]), kafkaProp))
>
> ​
>
> I was implementing my own JsonSerde with Jackson/Gson, but in both case I
> get the error
>
> Task not serializable
>     org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>     org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>     org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>     org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>     com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)
>
> ​
> It seems that both Jackson and Gson have classes that is not serializable.
>
> I couldn't find any other solution to perform this JSON-to-Case-Class
> parsing, yet it seems a very basic need. What am I missing?
>
>
> Thanks,
> Jack
>
>
>
>