You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ying Z (Jira)" <ji...@apache.org> on 2020/08/08 09:17:00 UTC

[jira] [Commented] (FLINK-15719) Exceptions when using scala types directly with the State Process API

    [ https://issues.apache.org/jira/browse/FLINK-15719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173617#comment-17173617 ] 

Ying Z commented on FLINK-15719:
--------------------------------

Hi, [~tzulitai] , I want to help to modify the doc [1] to make it less error-prone, is it ok? Here is my test code of keyed state in scala lang:
 # stateful process function to generate state
 # inputs: 1 2 3 4 5 6

 
{code:java}
// code placeholder
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
  var state: ValueState[Int] = _
  var updateTimes: ListState[Long] = _

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
    val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int])
//    val stateDescriptor = new ValueStateDescriptor("state", Types.INT)
    state = getRuntimeContext().getState(stateDescriptor)

    val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long])
//    val updateDescriptor = new ListStateDescriptor("times", Types.LONG)
    updateTimes = getRuntimeContext().getListState(updateDescriptor)
  }

  @throws[Exception]
  override def processElement(value: Int, ctx: KeyedProcessFunction[ Int, Int, Void ]#Context, out: Collector[Void]): Unit = {
    state.update(value + 1)
    updateTimes.add(System.currentTimeMillis)
  }
}

object KeyedStateSample extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val fsStateBackend = new FsStateBackend("file:///tmp/chk_dir")
  env.setStateBackend(fsStateBackend)
  env.enableCheckpointing(60000)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

  env.socketTextStream("127.0.0.1", 8010)
    .map(_.toInt)
    .keyBy(i => i)
    .process(new StatefulFunctionWithTime)
    .uid("my-uid")

  env.execute()
}
{code}
 
 # read the state generated by code above, which outputs:
 # KeyedState(3,4,List(1596878053283))
KeyedState(5,6,List(1596878055023))
KeyedState(2,3,List(1596878052359))
KeyedState(4,5,List(1596878054098))
KeyedState(6,7,List(1596878056151))
KeyedState(1,2,List(1596878051332))

 
{code:java}
// code placeholder
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.state.api.Savepoint
import org.apache.flink.state.api.functions.KeyedStateReaderFunction
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._

/**
 * Description:
 */
object TestReadState extends App {
  val bEnv      = ExecutionEnvironment.getExecutionEnvironment
  val savepoint = Savepoint.load(bEnv, "file:///tmp/chk_dir/f988137ef1df4597bebc596ef7c76626/chk-2", new MemoryStateBackend)
  val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction)
  keyedState.print()

  case class KeyedState(key: Int, value: Int, times: List[Long])
  class ReaderFunction extends KeyedStateReaderFunction[java.lang.Integer, KeyedState] {
    var state: ValueState[Int] = _
    var updateTimes: ListState[Long] = _

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int])
      state = getRuntimeContext().getState(stateDescriptor)

      val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long])
      updateTimes = getRuntimeContext().getListState(updateDescriptor)
    }

    override def readKey(key: java.lang.Integer,
                         ctx: KeyedStateReaderFunction.Context,
                         out: Collector[KeyedState]): Unit = {
      val data = KeyedState(
        key,
        state.value(),
        updateTimes.get().asScala.toList)
      out.collect(data)
    }
  }
}
{code}
 

 

1. https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state

> Exceptions when using scala types directly with the State Process API
> ---------------------------------------------------------------------
>
>                 Key: FLINK-15719
>                 URL: https://issues.apache.org/jira/browse/FLINK-15719
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.9.1
>            Reporter: Ying Z
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>
> I followed these steps to generate and read states:
>  # implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good.
>  # execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
>  # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible."
> ReaderFunction code as below:
> {code:java}
> // code placeholder
>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
>     var countState: ValueState[(Long, Long)] = _
>     override def open(parameters: Configuration): Unit = {
>       val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)])
>       countState = getRuntimeContext().getState(stateDescriptor)
>     }    override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = {
>       out.collect(countState.value())
>     }
>   }
> {code}
> 1: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state] 
> 2: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)