You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Seth Wiesman <sw...@mediamath.com> on 2018/01/11 15:01:19 UTC

CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

This is less of a question and more of a PSA.

It looks like there is some sort of binary incompatible change in the scala standard library class `scala.collection.immutable.::`  between point releases of scala 2.11. CaseClassTypeInfo generated by the type information macro will fail to deserialize in user code with parent first class loading if the application is not compiled with 2.11.12.  The following will work with Child First Class Loading but fail with Parent First.


case class CustomClass(a: Int, b: Float)

class CustomMapFunction[T >: Null : TypeInformation] extends MapFunction[String, T] {
  override def map(value: String) = {
    val typeInfo = implicitly[TypeInformation[T]]

    // custom deserialization here
    null
  }
}


env
  .fromCollection(Iterator[String](""))
  .map(new CustomMapFunction[CustomClass])
  .print()



[cid:image001.png@01D38AC3.21940180]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




Re: CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

Posted by Timo Walther <tw...@apache.org>.
I filed an issue for this: https://issues.apache.org/jira/browse/FLINK-8451

Am 1/12/18 um 4:40 PM schrieb Seth Wiesman:
> Here is the stack trace:
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
> 	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.$colon$colon to field org.apache.flink.api.scala.typeutils.CaseClassTypeInfo.fieldNames of type scala.collection.Seq in instance of com.mediamath.reporting.PerformanceJob$$anon$3
> 	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
> 	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> 	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> 	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
> 	... 4 more
>
>
> 	Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 10007
swiesman@mediamath.com
>
>
>   
>
> On 1/12/18, 9:12 AM, "Tzu-Li (Gordon) Tai" <tz...@apache.org> wrote:
>
>      Hi Seth,
>      
>      Thanks a lot for the report!
>      
>      I think your observation is expected behaviour, if there really is a binary
>      incompatible change between Scala minor releases.
>      And yes, the type information macro in the Scala API is very sensitive to
>      the exact Scala version used. I had in the past also observed generated case
>      class serializers by the macro to be incompatible across different Scala
>      minor releases.
>      
>      Just curious, what exactly is the deserialization failure you observed when
>      using parent-first classloading?
>      Perhaps we should properly document these surprises somewhere in the
>      documentation ...
>      
>      Cheers,
>      Gordon
>      
>      
>      
>      
>      
>      --
>      Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>      
>


Re: CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

Posted by Seth Wiesman <sw...@mediamath.com>.
Here is the stack trace: 

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.$colon$colon to field org.apache.flink.api.scala.typeutils.CaseClassTypeInfo.fieldNames of type scala.collection.Seq in instance of com.mediamath.reporting.PerformanceJob$$anon$3
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
	... 4 more


	Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 10007
swiesman@mediamath.com 


 

On 1/12/18, 9:12 AM, "Tzu-Li (Gordon) Tai" <tz...@apache.org> wrote:

    Hi Seth,
    
    Thanks a lot for the report!
    
    I think your observation is expected behaviour, if there really is a binary
    incompatible change between Scala minor releases.
    And yes, the type information macro in the Scala API is very sensitive to
    the exact Scala version used. I had in the past also observed generated case
    class serializers by the macro to be incompatible across different Scala
    minor releases.
    
    Just curious, what exactly is the deserialization failure you observed when
    using parent-first classloading?
    Perhaps we should properly document these surprises somewhere in the
    documentation ...
    
    Cheers,
    Gordon
    
    
    
    
    
    --
    Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
    


Re: CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Seth,

Thanks a lot for the report!

I think your observation is expected behaviour, if there really is a binary
incompatible change between Scala minor releases.
And yes, the type information macro in the Scala API is very sensitive to
the exact Scala version used. I had in the past also observed generated case
class serializers by the macro to be incompatible across different Scala
minor releases.

Just curious, what exactly is the deserialization failure you observed when
using parent-first classloading?
Perhaps we should properly document these surprises somewhere in the
documentation ...

Cheers,
Gordon





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/