You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lars Skjærven <la...@gmail.com> on 2021/12/07 09:03:10 UTC

Scala Case Class Serialization

Hello,
We're running Flink 1.14 with scala, and we're suspecting that performance
is suffering due to serialization of some scala case classes. Specifically
we're seeing that our Case Class "cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as GenericType",
and that the case class "does not contain a setter for field X". I'm
interpreting these log messages as performance warnings.

A simple case class example we're writing to state that triggers the
mentioned 'warnings':
case class Progress(position: Int, eventTime: Int, alive: Boolean)

I'm understanding the docs that case classes with primitive types should be
supported "out of the box".

Any tips on how to proceed ?

Kind regards,
Lars

Re: Scala Case Class Serialization

Posted by Roman Grebennikov <gr...@dfdx.me>.
Hi,

I guess the problematic line where the kryo fallback is happening is here:

  lazy val myState: MapState[String, TestCaseClass] = getRuntimeContext.getMapState(
    new MapStateDescriptor[String, TestCaseClass]("test-state", classOf[String], ttestclass.getTypeClass)
 

MapStateDescriptor has multiple constructors, some of them do have strong java smell :)
The one you've used here with classOf[String] - is passing a class instance inside of the java constructor, and the constructor implicitly uses java typeinformation derivation under the hood, which has no idea about scala.

MapStateDescriptor also has another constructor, which can take the explicit TypeInformation for key and value, like this:
val keyTypeInfo = createTypeInformation[String]
val valueTypeInfo = createTypeInformation[TestCaseClass]
new MapStateDescriptor[String,TestCaseClass]("test", keyTypeInfo, valueTypeInfo)

then it won't try to behave too smart, won't try to derive typeinfo for Class[_] and will use the one you provided.

with best regards,
Roman Grebennikov | grv@dfdx.me


On Tue, Dec 7, 2021, at 19:05, Lars Skjærven wrote:
> Thanks for quick response. Please find attached a minimal example illustrating the issue. I've added implicit TypeInformation, and checked that I'm importing the scala variant only. 
> 
> Matthias: Just my superficial impression from [1]. Will look into TypeInfoFactory. 
> 
> Thanks again!
> 
> package com.mystuff
> import org.apache.flink.api.common.functions.RichFlatMapFunction
> import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
> import org.apache.flink.api.common.typeinfo.{TypeInformation}
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.util.Collector
> 
> case class TestCaseClass(id: String, pos: Int)
> 
> class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] {
>   implicit val ttestclass: TypeInformation[TestCaseClass] = createTypeInformation[TestCaseClass]
> 
>   lazy val myState: MapState[String, TestCaseClass] = getRuntimeContext.getMapState(
>     new MapStateDescriptor[String, TestCaseClass]("test-state", classOf[String], ttestclass.getTypeClass)
>   )
> 
>   override def flatMap(value: TestCaseClass, out: Collector[String]): Unit = {
>     myState.put(value.id, value)
>     myState.get(value.id)
>     out.collect(value.id)
>   }
> }
> 
> object TestJob {
> 
>   def main(args: Array[String]): Unit = {
> 
>     val env = StreamExecutionEnvironment.createLocalEnvironment()
>     env.getConfig.disableGenericTypes()
> 
>     val s = Seq[TestCaseClass](
>       TestCaseClass(id = "1", pos = 1),
>       TestCaseClass(id = "2", pos = 2),
>       TestCaseClass(id = "3", pos = 3),
>     )
> 
>     env
>       .fromCollection[TestCaseClass](s)
>       .keyBy(s => s.id)
>       .flatMap(new MyRichFlatMap)
>       .print()
> 
>     env.execute("Test Job")
>   }
> }
> 
> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
> 
> On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov <gr...@dfdx.me> wrote:
>> __
>> Hi Lars,
>> 
>> can you please show a small reproducer of the way you construct the DataStream, and which imports do you use?
>> 
>> We also often experience similar performance issues with scala, but usually they are related to accidental usage of Flink Java API. A couple of hints from my experience:
>> 1. Make sure that you always use the scala DataStream, and not the java one.
>> 2. All operations on scala datastream require an implicit TypeInformation[T] parameter, which is usually generated automatically for you if you do an "import org.apache.flink.api.scala._" by the createTypeInformation[T] macro. So make sure you have this import present.
>> 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw an exception each time it have to fall back to generic kryo serialization. Backtrace will highlight you an exact place in your code where it have to do a kryo fallback.
>> 
>> Also Flink will always revert to Kryo in case if you use sum types (or ADTs, or "sealed traits"). Shameless plug: we made a library to support that: https://github.com/findify/flink-adt
>> 
>> Roman Grebennikov | grv@dfdx.me
>> 
>> 
>> On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
>>> Hi Lars,
>>> not sure about the out-of-the-box support for case classes with primitive member types (could you refer to the section which made you conclude this?). I haven't used Scala with Flink, yet. So maybe, others can give more context.
>>> But have you looked into using the TypeInfoFactory to define the schema [1]?
>>> 
>>> Best,
>>> Matthias
>>> 
>>> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
>>> 
>>> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven <la...@gmail.com> wrote:
>>>> Hello,
>>>> We're running Flink 1.14 with scala, and we're suspecting that performance is suffering due to serialization of some scala case classes. Specifically we're seeing that our Case Class "cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType", and that the case class "does not contain a setter for field X". I'm interpreting these log messages as performance warnings. 
>>>> 
>>>> A simple case class example we're writing to state that triggers the mentioned 'warnings': 
>>>> 
>>>> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>>>> 
>>>> I'm understanding the docs that case classes with primitive types should be supported "out of the box". 
>>>> 
>>>> Any tips on how to proceed ? 
>>>> 
>>>> Kind regards, 
>>>> Lars
>>> 
>> 

Re: Scala Case Class Serialization

Posted by Lars Skjærven <la...@gmail.com>.
Thanks for quick response. Please find attached a minimal example
illustrating the issue. I've added implicit TypeInformation, and checked
that I'm importing the scala variant only.

Matthias: Just my superficial impression from [1]. Will look into
TypeInfoFactory.

Thanks again!

package com.mystuff
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeInformation}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

case class TestCaseClass(id: String, pos: Int)

class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] {
  implicit val ttestclass: TypeInformation[TestCaseClass] =
createTypeInformation[TestCaseClass]

  lazy val myState: MapState[String, TestCaseClass] =
getRuntimeContext.getMapState(
    new MapStateDescriptor[String, TestCaseClass]("test-state",
classOf[String], ttestclass.getTypeClass)
  )

  override def flatMap(value: TestCaseClass, out: Collector[String]): Unit
= {
    myState.put(value.id, value)
    myState.get(value.id)
    out.collect(value.id)
  }
}

object TestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.createLocalEnvironment()
    env.getConfig.disableGenericTypes()

    val s = Seq[TestCaseClass](
      TestCaseClass(id = "1", pos = 1),
      TestCaseClass(id = "2", pos = 2),
      TestCaseClass(id = "3", pos = 3),
    )

    env
      .fromCollection[TestCaseClass](s)
      .keyBy(s => s.id)
      .flatMap(new MyRichFlatMap)
      .print()

    env.execute("Test Job")
  }
}

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov <gr...@dfdx.me> wrote:

> Hi Lars,
>
> can you please show a small reproducer of the way you construct the
> DataStream, and which imports do you use?
>
> We also often experience similar performance issues with scala, but
> usually they are related to accidental usage of Flink Java API. A couple of
> hints from my experience:
> 1. Make sure that you always use the scala DataStream, and not the java
> one.
> 2. All operations on scala datastream require an implicit
> TypeInformation[T] parameter, which is usually generated automatically for
> you if you do an "import org.apache.flink.api.scala._" by the
> createTypeInformation[T] macro. So make sure you have this import present.
> 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw
> an exception each time it have to fall back to generic kryo serialization.
> Backtrace will highlight you an exact place in your code where it have to
> do a kryo fallback.
>
> Also Flink will always revert to Kryo in case if you use sum types (or
> ADTs, or "sealed traits"). Shameless plug: we made a library to support
> that: https://github.com/findify/flink-adt
>
> Roman Grebennikov | grv@dfdx.me
>
>
> On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
>
> Hi Lars,
> not sure about the out-of-the-box support for case classes with primitive
> member types (could you refer to the section which made you conclude
> this?). I haven't used Scala with Flink, yet. So maybe, others can give
> more context.
> But have you looked into using the TypeInfoFactory to define the schema
> [1]?
>
> Best,
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
>
> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven <la...@gmail.com> wrote:
>
> Hello,
> We're running Flink 1.14 with scala, and we're suspecting that performance
> is suffering due to serialization of some scala case classes. Specifically
> we're seeing that our Case Class "cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType",
> and that the case class "does not contain a setter for field X". I'm
> interpreting these log messages as performance warnings.
>
> A simple case class example we're writing to state that triggers the
> mentioned 'warnings':
>
> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>
> I'm understanding the docs that case classes with primitive types should
> be supported "out of the box".
>
> Any tips on how to proceed ?
>
> Kind regards,
> Lars
>
>
>
>

Re: Scala Case Class Serialization

Posted by Roman Grebennikov <gr...@dfdx.me>.
Hi Lars,

can you please show a small reproducer of the way you construct the DataStream, and which imports do you use?

We also often experience similar performance issues with scala, but usually they are related to accidental usage of Flink Java API. A couple of hints from my experience:
1. Make sure that you always use the scala DataStream, and not the java one.
2. All operations on scala datastream require an implicit TypeInformation[T] parameter, which is usually generated automatically for you if you do an "import org.apache.flink.api.scala._" by the createTypeInformation[T] macro. So make sure you have this import present.
3. You can do a "env.getConfig.disableGenericTypes" and flink will throw an exception each time it have to fall back to generic kryo serialization. Backtrace will highlight you an exact place in your code where it have to do a kryo fallback.

Also Flink will always revert to Kryo in case if you use sum types (or ADTs, or "sealed traits"). Shameless plug: we made a library to support that: https://github.com/findify/flink-adt

Roman Grebennikov | grv@dfdx.me


On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
> Hi Lars,
> not sure about the out-of-the-box support for case classes with primitive member types (could you refer to the section which made you conclude this?). I haven't used Scala with Flink, yet. So maybe, others can give more context.
> But have you looked into using the TypeInfoFactory to define the schema [1]?
> 
> Best,
> Matthias
> 
> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
> 
> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven <la...@gmail.com> wrote:
>> Hello,
>> We're running Flink 1.14 with scala, and we're suspecting that performance is suffering due to serialization of some scala case classes. Specifically we're seeing that our Case Class "cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType", and that the case class "does not contain a setter for field X". I'm interpreting these log messages as performance warnings. 
>> 
>> A simple case class example we're writing to state that triggers the mentioned 'warnings': 
>> 
>> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>> 
>> I'm understanding the docs that case classes with primitive types should be supported "out of the box". 
>> 
>> Any tips on how to proceed ? 
>> 
>> Kind regards, 
>> Lars
> 

Re: Scala Case Class Serialization

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Lars,
not sure about the out-of-the-box support for case classes with primitive
member types (could you refer to the section which made you conclude
this?). I haven't used Scala with Flink, yet. So maybe, others can give
more context.
But have you looked into using the TypeInfoFactory to define the schema [1]?

Best,
Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven <la...@gmail.com> wrote:

> Hello,
> We're running Flink 1.14 with scala, and we're suspecting that performance
> is suffering due to serialization of some scala case classes. Specifically
> we're seeing that our Case Class "cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType",
> and that the case class "does not contain a setter for field X". I'm
> interpreting these log messages as performance warnings.
>
> A simple case class example we're writing to state that triggers the
> mentioned 'warnings':
> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>
> I'm understanding the docs that case classes with primitive types should
> be supported "out of the box".
>
> Any tips on how to proceed ?
>
> Kind regards,
> Lars
>