You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2015/10/18 10:59:05 UTC

[jira] [Commented] (FLINK-2747) TypeExtractor does not correctly analyze Scala Immutables (AnyVal)

    [ https://issues.apache.org/jira/browse/FLINK-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14962234#comment-14962234 ] 

Aljoscha Krettek commented on FLINK-2747:
-----------------------------------------

I looked into this for a bit. I have a version that is almost running but it is a very deep problem. The actual class that you get from the example code above is this:

{code}
  class Id(val underlying: Int) extends AnyVal

  class X(var id: Int) {
    def this() { this(0) }
  }
{code}

where the Scala compiler sometimes inserts automatic instantiations of Id if it is required. This would require that other Types are aware of the fact that they can contain value classes, or some other clever handling.

Maybe we should not fix this for 0.10 since we have the fix of forcing the Kryo serializer.

> TypeExtractor does not correctly analyze Scala Immutables (AnyVal)
> ------------------------------------------------------------------
>
>                 Key: FLINK-2747
>                 URL: https://issues.apache.org/jira/browse/FLINK-2747
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 0.10
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 0.10
>
>
> This example program only works correctly if Kryo is force-enabled.
> {code}
> object Test {
>   class Id(val underlying: Int) extends AnyVal
>   class X(var id: Id) {
>     def this() { this(new Id(0)) }
>   }
>   class MySource extends SourceFunction[X] {
>     def run(ctx: SourceFunction.SourceContext[X]) {
>       ctx.collect(new X(new Id(1)))
>     }
>     def cancel() {}
>   }
>   def main(args: Array[String]) {
>     val env = StreamExecutionContext.getExecutionContext
>     env.addSource(new MySource).print
>     env.execute("Test")
>   }
> }
> {code}
> The program fails with this:
> {code}
> Caused by: java.lang.RuntimeException: Cannot instantiate class.
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:227)
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:421)
> 	at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
> 	at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:136)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:55)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:198)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)