You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2016/09/22 08:02:49 UTC

[GitHub] flink pull request #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot resto...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/2533

    [FLINK-4603] Fixes: KeyedStateBackend cannot restore user code classes

    This PR fixes [FLINK-4603] and introduces a test to protect better against future regression.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink backend-classloader-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2533.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2533
    
----
commit d6b8b0112c6a4cf3f2cbf5eb758599e15d796aab
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-21T12:55:58Z

    [FLINK-4603] KeyedStateBackend can restore user code classes

commit 78b2a4f048bd62e55471a384169304ca46bbbf60
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-21T15:56:08Z

    [FLINK-4603] Test case

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot restore user...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/2533
  
    @StephanEwen at least in the RocksDB backend we could remove user code completely. Right now, the only thing that needs to be serialized is the TypeSerializer from the ValueDescriptor. It is used in a check that users can not provide a descriptor with a different TypeSerializer than the one that was used initially. We might think about removing this to support versioning of TypeSerializers, but how can we somehow enforce compatibility between them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot restore user...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2533
  
    I see, keeping the serializers for now makes probably sense.
    It just seems that there are also user functions in there (like fold, etc) - those should probably be removed. May mean that we have to inject them back into the state descriptor later.
    
    Orthogonal issue, so +1 for this change.
    
    Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot resto...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2533#discussion_r80005604
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -266,18 +265,20 @@ public void restorePartitionedState(List<KeyGroupsStateHandle> state) throws Exc
     			for (int i = 0; i < numKvStates; ++i) {
     				String stateName = inView.readUTF();
     
    -				ObjectInputStream ois = new ObjectInputStream(inView);
    +				TypeSerializer namespaceSerializer =
    +						InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
    +				TypeSerializer stateSerializer =
    +						InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
     
    -				TypeSerializer namespaceSerializer = (TypeSerializer) ois.readObject();
    -				TypeSerializer stateSerializer = (TypeSerializer) ois.readObject();
    -				StateTable<K, ?, ?> stateTable = new StateTable(stateSerializer,
    +				StateTable<K, ?, ?> stateTable = new StateTable(
    +						stateSerializer,
     						namespaceSerializer,
     						keyGroupRange);
     				stateTables.put(stateName, stateTable);
     				kvStatesById.put(i, stateName);
     			}
     
    -			for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
    +			for (int keyGroupIndex = keyGroupRange.getStartKeyGroup();  keyGroupIndex <= keyGroupRange.getEndKeyGroup(); ++keyGroupIndex) {
    --- End diff --
    
    Was this wrong before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot resto...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2533


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot restore user...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/2533
  
    Please review @tillrohrmann or @aljoscha 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot restore user...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2533
  
    Fix looks good.
    Better would probably be to not even have the user code in the checkpoint at all.
    Can we do that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot restore user...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2533
  
    @StephanEwen we can do that but then we won't have any sanity checks for the `TypeSerializer` any more. Right now, even the RocksDB backed will serializer the `TypeSerializer`/`StateDescriptor` with the checkpoint to verify that the user only accesses it with the correct `TypeSerializer`/`StateDescriptor`.
    
    I would be in favor of completely getting rid of user code there, even if it means losing those checks. Also, for this to work with the Heap backend we need to either always keep state on the heap in serialized form or deserialize lazily from restored serialized values using the `TypeSerializer` that we get from the user when they access state for the first time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2533: [FLINK-4603] Fixes: KeyedStateBackend cannot resto...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2533#discussion_r80010321
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -266,18 +265,20 @@ public void restorePartitionedState(List<KeyGroupsStateHandle> state) throws Exc
     			for (int i = 0; i < numKvStates; ++i) {
     				String stateName = inView.readUTF();
     
    -				ObjectInputStream ois = new ObjectInputStream(inView);
    +				TypeSerializer namespaceSerializer =
    +						InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
    +				TypeSerializer stateSerializer =
    +						InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
     
    -				TypeSerializer namespaceSerializer = (TypeSerializer) ois.readObject();
    -				TypeSerializer stateSerializer = (TypeSerializer) ois.readObject();
    -				StateTable<K, ?, ?> stateTable = new StateTable(stateSerializer,
    +				StateTable<K, ?, ?> stateTable = new StateTable(
    +						stateSerializer,
     						namespaceSerializer,
     						keyGroupRange);
     				stateTables.put(stateName, stateTable);
     				kvStatesById.put(i, stateName);
     			}
     
    -			for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
    +			for (int keyGroupIndex = keyGroupRange.getStartKeyGroup();  keyGroupIndex <= keyGroupRange.getEndKeyGroup(); ++keyGroupIndex) {
    --- End diff --
    
    I think i wanted to break this because it exceeds the line limit but then decided against it because IntelliJ messed up the formatting for loops. Nothing wrong there at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---