You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2018/03/20 20:48:42 UTC

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/5732

    [FLINK-9034] [FLINK-9035] [core] Fix state descriptors

    ## What is the purpose of the change
    
    Fixes two issue with the `StateDescriptors` that are used to obtain state access in transformation functions: 
    
    ### Broken Equals and hashCode
    
    `equals()` and `hashCode()` depends on fields that are not always set and that may change during the life of a state descriptor. That is especially problematic, because the state descriptors are keys in a map, and if the meaning of `equals()` and `hashCode()` changes after insertion, the objects become keys that cannot be references / matched.
    
    This pull request changes `equals()` and `hashCode()` to only take state name and descriptor type (by class) into account for hashCode and equality, which are always constant and not changing as part of serializer initialization.
    
    **Illustration of the problem:**
    
    The following code fails with a `NullPointerException`, because the `hashCode()` method tries to access the serializer field, which may be uninitialized at that point.
    
    ```java
    ValueStateDescriptor<String> descr = new ValueStateDescriptor<>("name", String.class);
    descr.hashCode(); // exception
    ```
    
    The equals() method is equally broken (no pun intended):
    ```java
    ValueStateDescriptor<String> a = new ValueStateDescriptor<>("name", String.class);
    ValueStateDescriptor<String> b = new ValueStateDescriptor<>("name", String.class);
    
    a.equals(b) // exception
    b.equals(a) // exception
    a.initializeSerializerUnlessSet(new ExecutionConfig());
    a.equals(b) // false
    b.equals(a) // exception
    b.initializeSerializerUnlessSet(new ExecutionConfig());
    a.equals(b) // true
    b.equals(a) // true
    ```
    
    ### Type Information dropped prematurely
    
    The following code is currently problematic:
    ```java
    public class MyFunction extends RichMapFunction<A, B>  {
    
        private final ValueStateDescriptor<MyType> descr = new ValueStateDescriptor<>("state name", MyType.class);
    
        private ValueState<MyType> state;
    
        @Override
        public void open(Configuration cfg) {
            state = getRuntimeContext().getValueState(descr);
        }
    }
    ```
    
    The problem is that the state descriptor drops the type information and creates a serializer before serialization as part of shipping the function in the cluster. To do that, it initializes the serializer with an empty execution config, making serialization inconsistent.
    
    This is mainly an artifact from the days when dropping the type information before shipping was necessary, because the type info was not serializable. It now is, and we can fix that bug.
    
    ## Verifying this change
    
    **This change is sensitive, because it touches the structures that all users use to obtain access to persistent state.**
    
    This change adds a series of Unit tests to validate the fixed behavior.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no)**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - **All changes should preserve full API compatibility.**
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know): **Touches the structures that give access to checkpointed state.**
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink fix_state_descriptors

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5732
    
----
commit 8ff1284d28a056b91d91607584fab2a55fbcc86c
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:15:08Z

    [hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state'

commit dc0df85e064ee45bcb0f83d21b00a1abc9359723
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:29:12Z

    [hotfix] [core] Add missing serialVersionUID to MapStateDescriptor

commit c62e84414ff4715399bce35ac74fb1d94256c3ed
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:43:33Z

    [hotfix] [core] Add @FunctionalInterface to KeySelector
    
    That clarifies that this interface should always be a SAM interface
    to allow that users created lambdas for its use.

commit 60f0327e8f12de6397e009d2d5c4134024c3e674
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:36:19Z

    [hotfix] [core] Demockitofy state descriptor tests

commit e18ad3d7c39c7a92d2db1066ad968fa2e71e3233
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T14:44:27Z

    [hotfix] [core] Make State Descriptors consistently use Preconditions instead of Objects.

commit 3216f5ad0c7d6964ae885979aa5fcfe9c8e19135
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T15:22:12Z

    [FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization.
    
    Throwing away TypeInformation upon serialization was previously done because the type
    information was not serializable. Now that it is serializable, we can (and should) keep
    it to provide consistent user experience, where all serializers respect the ExecutionConfig.

commit 7340bbec821e635cec7a4179a1444e0b0798bbdc
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T15:46:13Z

    [hotfix] [core] Consilidate serializer duplication tests in StateDescriptorTest where possible

commit c060ee9acdac35e08b5d1873b19e7bbf4d6906e8
Author: Stephan Ewen <se...@...>
Date:   2018-03-20T16:16:06Z

    [FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling

----


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176220129
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java ---
    @@ -149,13 +145,9 @@ public void testVeryLargeDefaultValue() throws Exception {
     	@SuppressWarnings("unchecked")
     	@Test
     	public void testSerializerDuplication() {
    -		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
    -		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
    -			@Override
    -			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
    -				return mock(TypeSerializer.class);
    -			}
    -		});
    +		// we need a serializer that actually duplicates for testing (a stateful one)
    --- End diff --
    
    See above


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r175983599
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---
    @@ -77,18 +80,22 @@
     
     	/** The serializer for the type. May be eagerly initialized in the constructor,
     	 * or lazily once the type is serialized or an ExecutionConfig is provided. */
    +	@Nullable
     	protected TypeSerializer<T> serializer;
     
    +	/** The type information describing the value type. Only used to lazily create the serializer
    +	 * and dropped during serialization */
    +	@Nullable
    --- End diff --
    
    nit:Type information will not dropped during serialization now, it dropped in `initializeSerializerUnlessSet()`.


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176220108
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java ---
    @@ -118,17 +115,14 @@ public void testValueStateDescriptorAutoSerializer() throws Exception {
     	@SuppressWarnings("unchecked")
     	@Test
     	public void testSerializerDuplication() {
    -		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
    -		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
    -			@Override
    -			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
    -				return mock(TypeSerializer.class);
    -			}
    -		});
    -
    -		ReduceFunction<String> reducer = mock(ReduceFunction.class);
    -
    -		ReducingStateDescriptor<String> descr = new ReducingStateDescriptor<>("foobar", reducer, statefulSerializer);
    +		// we need a serializer that actually duplicates for testing (a stateful one)
    --- End diff --
    
    See above


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176006260
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---
    @@ -77,18 +80,22 @@
     
     	/** The serializer for the type. May be eagerly initialized in the constructor,
     	 * or lazily once the type is serialized or an ExecutionConfig is provided. */
    +	@Nullable
     	protected TypeSerializer<T> serializer;
     
    +	/** The type information describing the value type. Only used to lazily create the serializer
    +	 * and dropped during serialization */
    +	@Nullable
    --- End diff --
    
    good catch, will fix that upon merging


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176221943
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---
    @@ -249,12 +257,13 @@ public boolean isSerializerInitialized() {
     	 */
     	public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
    --- End diff --
    
    This is slightly orthogonal to this change, but: could we get rid of this method and instead change `getSerializer()` to `getSerializer(ExecutionConfig)`. That way, we don't have to be concerned about forgetting to call `initializeSerializerUnlessSet()`.


---

[GitHub] flink issue #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descriptors

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5732
  
    @StephanEwen Yes, I think this should go into 1.5.0 because it fixes potential (and real) problems. And yes, I wasn't suggesting to remove `initializeSerializerUnlessSet(ExecutionConfig)` now, but it seemed like a good place to mention it. 😃 


---

[GitHub] flink issue #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descriptors

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5732
  
    Manually merged in f3a519712fb31f7b71181e876c3c3d5fff08eb71


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176219997
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java ---
    @@ -121,13 +117,9 @@ public void testValueStateDescriptorAutoSerializer() throws Exception {
     	@SuppressWarnings("unchecked")
     	@Test
     	public void testSerializerDuplication() {
    -		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
    -		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
    -			@Override
    -			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
    -				return mock(TypeSerializer.class);
    -			}
    -		});
    +		// we need a serializer that actually duplicates for testing (a stateful one)
    --- End diff --
    
    Same comment as above.


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176220075
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java ---
    @@ -129,23 +125,12 @@ public void testMapStateDescriptorAutoSerializer() throws Exception {
     	 * <p>Tests that the returned serializer is duplicated. This allows to
     	 * share the state descriptor.
     	 */
    -	@SuppressWarnings("unchecked")
     	@Test
     	public void testSerializerDuplication() {
    -		TypeSerializer<String> keySerializer = mock(TypeSerializer.class);
    -		TypeSerializer<Long> valueSerializer = mock(TypeSerializer.class);
    -		when(keySerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
    -			@Override
    -			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
    -				return mock(TypeSerializer.class);
    -			}
    -		});
    -		when(valueSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<Long>>() {
    -			@Override
    -			public TypeSerializer<Long> answer(InvocationOnMock invocation) throws Throwable {
    -				return mock(TypeSerializer.class);
    -			}
    -		});
    +		// we need a serializer that actually duplicates for testing (a stateful one)
    --- End diff --
    
    See above


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176220974
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---
    @@ -77,18 +80,22 @@
     
     	/** The serializer for the type. May be eagerly initialized in the constructor,
     	 * or lazily once the type is serialized or an ExecutionConfig is provided. */
    +	@Nullable
     	protected TypeSerializer<T> serializer;
     
    +	/** The type information describing the value type. Only used to lazily create the serializer
    --- End diff --
    
    nit: I think this was also a copying error in the original comment, but this is not necessarily a "value", unless we simply thing of all state as a value, in which case I'm fine with this.


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176219956
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java ---
    @@ -41,16 +40,11 @@
     	 * <p>Tests that the returned serializer is duplicated. This allows to
     	 * share the state descriptor.
     	 */
    -	@SuppressWarnings("unchecked")
     	@Test
     	public void testSerializerDuplication() {
    -		TypeSerializer<Long> serializer = mock(TypeSerializer.class);
    -		when(serializer.duplicate()).thenAnswer(new Answer<TypeSerializer<Long>>() {
    -			@Override
    -			public TypeSerializer<Long> answer(InvocationOnMock invocation) throws Throwable {
    -				return mock(TypeSerializer.class);
    -			}
    -		});
    +		// we need a serializer that actually duplicates for testing (a stateful one)
    --- End diff --
    
    Will this condition always hold? Should we maybe guard this assumption with an assertion, i.e. assert that the result of `serialiser.duplicate()` is different from the original serialiser?


---

[GitHub] flink issue #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descriptors

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5732
  
    Concerning guarding the test assumptions: I think it is fine, because if the KryoSerializer is not duplicating properly any more (assumption violated), the test will also fail. (The problematic case would be the other way around, a serializer that does not duplicate and we check that the same instance is returned).
    
    @aljoscha Do you think this should go into 1.5 as well?
    
    @aljoscha Concerning changing `initializeSerializerUnlessSet(ExecutionConfig)` to `getSerializer(ExecutionConfig)` - I think that is a good idea, but would do that in a separate step. We need to re-work anyways a bit how we store state descriptors and obtain and reconfigure serializers.


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen closed the pull request at:

    https://github.com/apache/flink/pull/5732


---

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5732#discussion_r176222855
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java ---
    @@ -130,6 +135,31 @@ public void testInitializeSerializerAfterSerializationWithCustomConfig() throws
     				.getRegistration(File.class).getId() > 0);
     	}
     
    +	// ------------------------------------------------------------------------
    +	//  Tests for serializer initialization
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * FLINK-6775, tests that the returned serializer is duplicated.
    +	 * This allows to share the state descriptor across threads.
    +	 */
    +	@Test
    +	public void testSerializerDuplication() throws Exception {
    +		// we need a serializer that actually duplicates for testing (a stateful one)
    --- End diff --
    
    Same as above, we should assert that assumption


---