You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (Jira)" <ji...@apache.org> on 2020/01/21 12:49:00 UTC

[jira] [Commented] (FLINK-15719) Exceptions when using scala types directly with the State Process API

    [ https://issues.apache.org/jira/browse/FLINK-15719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020200#comment-17020200 ] 

Tzu-Li (Gordon) Tai commented on FLINK-15719:
---------------------------------------------

FYI: link to some discussion of the issue in the mailing lists - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Questions-of-quot-State-Processing-API-in-Scala-quot-td32288.html

> Exceptions when using scala types directly with the State Process API
> ---------------------------------------------------------------------
>
>                 Key: FLINK-15719
>                 URL: https://issues.apache.org/jira/browse/FLINK-15719
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.9.1
>            Reporter: Ying Z
>            Priority: Major
>
> I followed these steps to generate and read states:
>  # implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good.
>  # execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
>  # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible."
> ReaderFunction code as below:
> {code:java}
> // code placeholder
>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
>     var countState: ValueState[(Long, Long)] = _
>     override def open(parameters: Configuration): Unit = {
>       val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)])
>       countState = getRuntimeContext().getState(stateDescriptor)
>     }    override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = {
>       out.collect(countState.value())
>     }
>   }
> {code}
> 1: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state] 
> 2: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)