You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2018/03/21 12:37:03 UTC

[GitHub] flink pull request #5735: [FLINK-9036] [core] Add default values to State De...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/5735

     [FLINK-9036] [core] Add default values to State Descriptors via suppliers

    **This PR is based on #5732 and only the last two commits are relevant for this PR**
    
    ## What is the purpose of the change
    
    Earlier versions had a default value in `ValueState`. We dropped that, because the value would have to be duplicated on each access, to be safe against side effects when using mutable types.
    
    This pull request re-adds the feature, but using a supplier/factory function to create the default value on access. This is more efficient than copying a shared default value on access.
    
    ## Brief change log
    
      - The `StateDescriptor` produces default values through a optional `Supplier` function.
      - For backwards compatibility, the mode to pass directly a value is kept. The value is wrapped in a `DefaultValueFactory` which implements the legacy functionality using a serializer to copy the value on each access.
    
    ## Verifying this change
    
      - This change adds a set of unit tests
      - The change modifies one example program (`StateMachineExample`). Running that example shows how the change works end-to-end.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
      - The serializers: (yes / no / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
      - The S3 file system connector: (yes / no / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / no)
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink state_default_values

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5735
    
----
commit 1c756f0d6dfe71114a97a1b9effaf321b9da063b
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:29:12Z

    [hotfix] [core] Add missing serialVersionUID to MapStateDescriptor

commit 186008c609635f99e4123912a632a4e068d3c532
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:36:19Z

    [hotfix] [core] Demockitofy state descriptor tests

commit 98666506c193feffb3952d9d424d3aa924f40318
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:44:27Z

    [hotfix] [core] Make State Descriptors consistently use Preconditions instead of Objects.

commit 1b286e4adbb5369df41c902bd161f5e854b862b8
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T15:22:12Z

    [FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization.
    
    Throwing away TypeInformation upon serialization was previously done because the type
    information was not serializable. Now that it is serializable, we can (and should) keep
    it to provide consistent user experience, where all serializers respect the ExecutionConfig.

commit 6064b3d49d75d40ea69a65f5e38724bf9119b526
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T15:46:13Z

    [hotfix] [core] Consilidate serializer duplication tests in StateDescriptorTest where possible

commit a29b128f4f1bec49f1403aa21889e5890dc589ee
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T16:16:06Z

    [FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling

commit f19a4721acae62f8ba578c7cb235b6a917f3a258
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T17:04:24Z

    [FLINK-9036] [core] Add default values to State Descriptors via suppliers

commit 6d7757017f52f7c3fd7cbe99d05f1de63186d12d
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T18:51:02Z

    [FLINK-9036] [examples] Use state default value in StateMachineExample

----


---

[GitHub] flink pull request #5735: [FLINK-9036] [core] Add default values to State De...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5735#discussion_r176099489
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java ---
    @@ -55,7 +55,7 @@
     	 * @param typeClass The type of the values in the state.
     	 */
     	public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, Class<ACC> typeClass) {
    -		super(name, typeClass, initialValue);
    +		super(name, typeClass, DefaultValueFactory.create(typeClass, requireNonNull(initialValue)));
     		this.foldFunction = requireNonNull(foldFunction);
    --- End diff --
    
    I think use `Preconditions. checkNotNull()` instead of `requireNonNull()` here is more consistence with the previous commit: `Make State Descriptors consistently use Preconditions…`.


---

[GitHub] flink pull request #5735: [FLINK-9036] [core] Add default values to State De...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5735#discussion_r176340672
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java ---
    @@ -55,7 +55,7 @@
     	 * @param typeClass The type of the values in the state.
     	 */
     	public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, Class<ACC> typeClass) {
    -		super(name, typeClass, initialValue);
    +		super(name, typeClass, DefaultValueFactory.create(typeClass, requireNonNull(initialValue)));
     		this.foldFunction = requireNonNull(foldFunction);
    --- End diff --
    
    I had to remove the `checkNotNull()` all together, because `null` is actually a valid initial value.


---

[GitHub] flink pull request #5735: [FLINK-9036] [core] Add default values to State De...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5735#discussion_r176099684
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java ---
    @@ -89,7 +89,7 @@ public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC
     	 * @param typeSerializer The type serializer of the values in the state.
     	 */
     	public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, TypeSerializer<ACC> typeSerializer) {
    -		super(name, typeSerializer, initialValue);
    +		super(name, typeSerializer, DefaultValueFactory.create(typeSerializer, requireNonNull(initialValue)));
     		this.foldFunction = requireNonNull(foldFunction);
    --- End diff --
    
    same as above.


---

[GitHub] flink issue #5735: [FLINK-9036] [core] Add default values to State Descripto...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5735
  
    @aljoscha Would be interested in your opinion here.
    
    This is basically one of two ways to improve the handling of default values:
      1. Add a default value supplier on the state descriptor (this approach). Advantage is that you can use this to backwards compatibly handle the previous cases of default values (including the starting value for folding state)
      2. Add a `T getOrDefault(Supplier<T>)` method to `ValueState`. This might me almost simpler to do even, and more flexible as it allows for different default values in different contexts. This can get inefficient though when users naively create an anonymous class for the supplier (probably not a big deal any more since lambdas) and it breaks with the current approach, meaning we two different ways for default values that need to work together, one of which is deprecated, but still needs to be supported until Flink 2.0


---

[GitHub] flink issue #5735: [FLINK-9036] [core] Add default values to State Descripto...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5735
  
    I think I'd favour the second approach because the default value is closer to where the return value is used. If it's define "far away" when defining the state descriptor it's easier for code to get out of sync. But I think both approaches would work well.
    
    Though I'm also a bit skeptical about adding the feature again unless a lot of people absolutely want it. I can see that it might be useful but in the past we prematurely added a lot of API because we thought people might need it and now we have a lot of legacy stuff that we need to keep (for example all the "source" methods on `StreamingExecutionEnvironment` and the various `write*()` methods on `DataStream`. Still not opposed, though.


---

[GitHub] flink pull request #5735: [FLINK-9036] [core] Add default values to State De...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5735#discussion_r176099599
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java ---
    @@ -72,7 +72,7 @@ public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC
     	 * @param typeInfo The type of the values in the state.
     	 */
     	public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, TypeInformation<ACC> typeInfo) {
    -		super(name, typeInfo, initialValue);
    +		super(name, typeInfo, DefaultValueFactory.create(typeInfo, requireNonNull(initialValue)));
     		this.foldFunction = requireNonNull(foldFunction);
    --- End diff --
    
    same as above.


---