You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Seth Wiesman (Jira)" <ji...@apache.org> on 2021/01/08 15:45:00 UTC

[jira] [Commented] (FLINK-20890) flink-state-processor-api: differents serializers being taken by Flink & State processor api

    [ https://issues.apache.org/jira/browse/FLINK-20890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17261384#comment-17261384 ] 

Seth Wiesman commented on FLINK-20890:
--------------------------------------

Hi [~fsilvestremorais]

 

The issue here is Flink does not have a scala State Processor API. This means it uses the java serialization stack when writing out your savepoint. There is a simple workaround where you can manually pass the scala serializer to the keyBy (see below). Long term the solution is to add a proper scala API. I'm not aware of anyone working on that.

 
{code:java}
transformation.keyBy(_.accountId, implicitly[TypeInformation[Integer]])
{code}

> flink-state-processor-api: differents serializers being taken by Flink & State processor api
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20890
>                 URL: https://issues.apache.org/jira/browse/FLINK-20890
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.12.0
>            Reporter: Felipe Silvestre Santos de Morais
>            Priority: Major
>             Fix For: 1.12.0
>
>         Attachments: flinkschemaevolution2.zip
>
>
> When a savepoint is triggered for a regular Flink Job with a keyed function, the key is serialized with 
> {noformat}
> flink.api.common.typeutils.base.IntSerializer{noformat}
> and the value serialized with
> {noformat}
> flink.api.scala.typeutils.ScalaCaseClassSerializerSnapshot{noformat}
>  
> When the savepoint is loaded with the state processor api, transformed, and rewritten to the disk, the serializer taken is different.
> Key:
> {noformat}
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot{noformat}
>  
> Now when the savepoint written by the states processor api is loaded, there is the exception:
> {code:java}
> Caused by: org.apache.flink.util.StateMigrationException: The new key serializer (org.apache.flink.api.common.typeutils.base.IntSerializer@11a7ba62) must be compatible with the previous key serializer (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@3b56cc30).Caused by: org.apache.flink.util.StateMigrationException: The new key serializer (org.apache.flink.api.common.typeutils.base.IntSerializer@11a7ba62) must be compatible with the previous key serializer (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@3b56cc30). at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:147) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 more
> {code}
>  
> State processor api should use the same serializer of Flink since the type is exactly the same.
> I have attached a zip that contains the code to test it.
> In the project zipped there are the source and rewritten savepoints.
>  
> Note:
> I have tried to play with enableX/disableX serializer, but so far no success.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)