You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/03/20 20:49:00 UTC

[jira] [Commented] (FLINK-9034) State Descriptors drop TypeInformation on serialization

    [ https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407020#comment-16407020 ] 

ASF GitHub Bot commented on FLINK-9034:
---------------------------------------

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/5732

    [FLINK-9034] [FLINK-9035] [core] Fix state descriptors

    ## What is the purpose of the change
    
    Fixes two issue with the `StateDescriptors` that are used to obtain state access in transformation functions: 
    
    ### Broken Equals and hashCode
    
    `equals()` and `hashCode()` depends on fields that are not always set and that may change during the life of a state descriptor. That is especially problematic, because the state descriptors are keys in a map, and if the meaning of `equals()` and `hashCode()` changes after insertion, the objects become keys that cannot be references / matched.
    
    This pull request changes `equals()` and `hashCode()` to only take state name and descriptor type (by class) into account for hashCode and equality, which are always constant and not changing as part of serializer initialization.
    
    **Illustration of the problem:**
    
    The following code fails with a `NullPointerException`, because the `hashCode()` method tries to access the serializer field, which may be uninitialized at that point.
    
    ```java
    ValueStateDescriptor<String> descr = new ValueStateDescriptor<>("name", String.class);
    descr.hashCode(); // exception
    ```
    
    The equals() method is equally broken (no pun intended):
    ```java
    ValueStateDescriptor<String> a = new ValueStateDescriptor<>("name", String.class);
    ValueStateDescriptor<String> b = new ValueStateDescriptor<>("name", String.class);
    
    a.equals(b) // exception
    b.equals(a) // exception
    a.initializeSerializerUnlessSet(new ExecutionConfig());
    a.equals(b) // false
    b.equals(a) // exception
    b.initializeSerializerUnlessSet(new ExecutionConfig());
    a.equals(b) // true
    b.equals(a) // true
    ```
    
    ### Type Information dropped prematurely
    
    The following code is currently problematic:
    ```java
    public class MyFunction extends RichMapFunction<A, B>  {
    
        private final ValueStateDescriptor<MyType> descr = new ValueStateDescriptor<>("state name", MyType.class);
    
        private ValueState<MyType> state;
    
        @Override
        public void open(Configuration cfg) {
            state = getRuntimeContext().getValueState(descr);
        }
    }
    ```
    
    The problem is that the state descriptor drops the type information and creates a serializer before serialization as part of shipping the function in the cluster. To do that, it initializes the serializer with an empty execution config, making serialization inconsistent.
    
    This is mainly an artifact from the days when dropping the type information before shipping was necessary, because the type info was not serializable. It now is, and we can fix that bug.
    
    ## Verifying this change
    
    **This change is sensitive, because it touches the structures that all users use to obtain access to persistent state.**
    
    This change adds a series of Unit tests to validate the fixed behavior.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no)**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - **All changes should preserve full API compatibility.**
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know): **Touches the structures that give access to checkpointed state.**
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink fix_state_descriptors

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5732
    
----
commit 8ff1284d28a056b91d91607584fab2a55fbcc86c
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:15:08Z

    [hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state'

commit dc0df85e064ee45bcb0f83d21b00a1abc9359723
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:29:12Z

    [hotfix] [core] Add missing serialVersionUID to MapStateDescriptor

commit c62e84414ff4715399bce35ac74fb1d94256c3ed
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:43:33Z

    [hotfix] [core] Add @FunctionalInterface to KeySelector
    
    That clarifies that this interface should always be a SAM interface
    to allow that users created lambdas for its use.

commit 60f0327e8f12de6397e009d2d5c4134024c3e674
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:36:19Z

    [hotfix] [core] Demockitofy state descriptor tests

commit e18ad3d7c39c7a92d2db1066ad968fa2e71e3233
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:44:27Z

    [hotfix] [core] Make State Descriptors consistently use Preconditions instead of Objects.

commit 3216f5ad0c7d6964ae885979aa5fcfe9c8e19135
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T15:22:12Z

    [FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization.
    
    Throwing away TypeInformation upon serialization was previously done because the type
    information was not serializable. Now that it is serializable, we can (and should) keep
    it to provide consistent user experience, where all serializers respect the ExecutionConfig.

commit 7340bbec821e635cec7a4179a1444e0b0798bbdc
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T15:46:13Z

    [hotfix] [core] Consilidate serializer duplication tests in StateDescriptorTest where possible

commit c060ee9acdac35e08b5d1873b19e7bbf4d6906e8
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T16:16:06Z

    [FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling

----


> State Descriptors drop TypeInformation on serialization
> -------------------------------------------------------
>
>                 Key: FLINK-9034
>                 URL: https://issues.apache.org/jira/browse/FLINK-9034
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Major
>             Fix For: 1.6.0
>
>
> The following code currently causes problems
> {code}
> public class MyFunction extends RichMapFunction<A, B>  {
>     private final ValueStateDescriptor<MyType> descr = new ValueStateDescriptor<>("state name", MyType.class);
>     private ValueState<MyType> state;
>     @Override
>     public void open() {
>         state = getRuntimeContext().getValueState(descr);
>     }
> }
> {code}
> The problem is that the state descriptor drops the type information and creates a serializer before serialization as part of shipping the function in the cluster. To do that, it initializes the serializer with an empty execution config, making serialization inconsistent.
> This is mainly an artifact from the days when dropping the type information before shipping was necessary, because the type info was not serializable. It now is, and we can fix that bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)