You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Seth Wiesman (Jira)" <ji...@apache.org> on 2019/11/01 19:50:00 UTC

[jira] [Commented] (FLINK-14354) Provide interfaces instead of abstract classes in org.apache.flink.state.api.functions

    [ https://issues.apache.org/jira/browse/FLINK-14354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965058#comment-16965058 ] 

Seth Wiesman commented on FLINK-14354:
--------------------------------------

-1 I don't believe this is an appropriate change. 

 

1) This would break backwards compatibility.

2) RichFunction#open is an overloaded method. Along with registering state descriptors users also perform arbitrary life-cycle events (spawning threads, creating db connections, etc). State reads likely want to perform a different set of background actions.

3) I don't believe it is reasonable to assume that a state reader application has access to the operator class that was used to write the savepoint.

> Provide interfaces instead of abstract classes in org.apache.flink.state.api.functions
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-14354
>                 URL: https://issues.apache.org/jira/browse/FLINK-14354
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / State Processor
>            Reporter: Mitch Wasson
>            Priority: Minor
>
> I've started using the new state processing API in Flink 1.9. Super useful and works great for the most part.
> However, I think there is opportunity to simplify implementations that use the API. My request to enable these simplifications is to provides interfaces instead of (or in addition to) abstract classes in org.apache.flink.state.api.functions. Then have the state processing API require those interfaces.
> My use case involves maintaining and processing keyed state. This is accomplished with a KeyedProcessFunction:
> {color:#cc7832}class {color}BooleanProcess {color:#cc7832}extends {color}KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832}, {color}{color:#4e807d}String{color}{color:#cc7832}, {color}{color:#4e807d}String{color}] {
>    {color:#cc7832}var {color}{color:#9876aa}bool{color}: ValueState[{color:#cc7832}Boolean{color}] = _
>    {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters: Configuration) {
>       {color:#9876aa}bool {color}= getRuntimeContext.getState({color:#cc7832}new {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832}, {color}classOf[{color:#cc7832}Boolean{color}]))
>     }
>    {color:#cc7832}override def {color}{color:#ffc66d}processElement{color}(value: {color:#4e807d}String{color}{color:#cc7832}, {color}ctx: KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832}, {color}{color:#4e807d}String{color}{color:#cc7832}, {color}{color:#4e807d}String{color}]#Context{color:#cc7832}, {color}out: Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
>      {color:#cc7832}if {color}({color:#9876aa}bool{color}.value) {
>        out.collect(value)
>     } {color:#cc7832}else {color}{
>         {color:#cc7832}if {color}(Math.random < {color:#6897bb}0.005{color}) {
>           {color:#9876aa}bool{color}.update({color:#cc7832}true{color})
>           out.collect(value)
>         }
>       }
>     }
>  }
>  
> I then use a KeyedStateReaderFunction like this to inspect savepoints/checkpoints:
>  
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends {color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832}, {color}{color:#4e807d}String{color}] {
>    {color:#cc7832}var {color}{color:#9876aa}bool{color}: ValueState[{color:#cc7832}Boolean{color}] = _
>    {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters: Configuration) {
>       {color:#9876aa}bool {color}= getRuntimeContext.getState({color:#cc7832}new {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832}, {color}classOf[{color:#cc7832}Boolean{color}]))
>     }
>    {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key: {color:#4e807d}String{color}{color:#cc7832}, {color}ctx: KeyedStateReaderFunction.Context{color:#cc7832}, {color}out: Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}=
> {      out.collect(key)        }
> }
> Ideally, I would like my KeyedStateReaderFunction to look like this:
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends BooleanProcess{color} implements KeyedStateReaderFunction[String{color:#cc7832}, {color}{color:#4e807d}String{color}] {
>    {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key: {color:#4e807d}String{color}{color:#cc7832}, {color}ctx: KeyedStateReaderFunction.Context{color:#cc7832}, {color}out: Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}=
> {      out.collect(key)    }
> }
> However, this can't be done with the current API due Java's single inheritance and KeyedStateReaderFunction being an abstract class.
> The code savings are rather trivial in this example. However, it makes the state reader much easier to maintain. It would automatically inherit state and life cycle methods from the class whose state it is inspecting.
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)