You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ying Z (Jira)" <ji...@apache.org> on 2020/08/08 09:17:00 UTC
[jira] [Commented] (FLINK-15719) Exceptions when using scala types
directly with the State Process API
[ https://issues.apache.org/jira/browse/FLINK-15719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173617#comment-17173617 ]
Ying Z commented on FLINK-15719:
--------------------------------
Hi, [~tzulitai] , I want to help to modify the doc [1] to make it less error-prone, is it ok? Here is my test code of keyed state in scala lang:
# stateful process function to generate state
# inputs: 1 2 3 4 5 6
{code:java}
// code placeholder
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
var state: ValueState[Int] = _
var updateTimes: ListState[Long] = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int])
// val stateDescriptor = new ValueStateDescriptor("state", Types.INT)
state = getRuntimeContext().getState(stateDescriptor)
val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long])
// val updateDescriptor = new ListStateDescriptor("times", Types.LONG)
updateTimes = getRuntimeContext().getListState(updateDescriptor)
}
@throws[Exception]
override def processElement(value: Int, ctx: KeyedProcessFunction[ Int, Int, Void ]#Context, out: Collector[Void]): Unit = {
state.update(value + 1)
updateTimes.add(System.currentTimeMillis)
}
}
object KeyedStateSample extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val fsStateBackend = new FsStateBackend("file:///tmp/chk_dir")
env.setStateBackend(fsStateBackend)
env.enableCheckpointing(60000)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.socketTextStream("127.0.0.1", 8010)
.map(_.toInt)
.keyBy(i => i)
.process(new StatefulFunctionWithTime)
.uid("my-uid")
env.execute()
}
{code}
# read the state generated by code above, which outputs:
# KeyedState(3,4,List(1596878053283))
KeyedState(5,6,List(1596878055023))
KeyedState(2,3,List(1596878052359))
KeyedState(4,5,List(1596878054098))
KeyedState(6,7,List(1596878056151))
KeyedState(1,2,List(1596878051332))
{code:java}
// code placeholder
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.state.api.Savepoint
import org.apache.flink.state.api.functions.KeyedStateReaderFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
/**
* Description:
*/
object TestReadState extends App {
val bEnv = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(bEnv, "file:///tmp/chk_dir/f988137ef1df4597bebc596ef7c76626/chk-2", new MemoryStateBackend)
val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction)
keyedState.print()
case class KeyedState(key: Int, value: Int, times: List[Long])
class ReaderFunction extends KeyedStateReaderFunction[java.lang.Integer, KeyedState] {
var state: ValueState[Int] = _
var updateTimes: ListState[Long] = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int])
state = getRuntimeContext().getState(stateDescriptor)
val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long])
updateTimes = getRuntimeContext().getListState(updateDescriptor)
}
override def readKey(key: java.lang.Integer,
ctx: KeyedStateReaderFunction.Context,
out: Collector[KeyedState]): Unit = {
val data = KeyedState(
key,
state.value(),
updateTimes.get().asScala.toList)
out.collect(data)
}
}
}
{code}
1. https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state
> Exceptions when using scala types directly with the State Process API
> ---------------------------------------------------------------------
>
> Key: FLINK-15719
> URL: https://issues.apache.org/jira/browse/FLINK-15719
> Project: Flink
> Issue Type: Bug
> Components: API / State Processor
> Affects Versions: 1.9.1
> Reporter: Ying Z
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Major
>
> I followed these steps to generate and read states:
> # implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good.
> # execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
> # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible."
> ReaderFunction code as below:
> {code:java}
> // code placeholder
> class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
> var countState: ValueState[(Long, Long)] = _
> override def open(parameters: Configuration): Unit = {
> val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)])
> countState = getRuntimeContext().getState(stateDescriptor)
> } override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = {
> out.collect(countState.value())
> }
> }
> {code}
> 1: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state]
> 2: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)