You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/03/23 03:33:46 UTC
[GitHub] flink pull request #5751: [FLINK-9060][state] Deleting state using KeyedStat...
GitHub user sihuazhou opened a pull request:
https://github.com/apache/flink/pull/5751
[FLINK-9060][state] Deleting state using KeyedStateBackend.getKeys() throws Exception
## What is the purpose of the change
This PR fixes the problem when deleting state using `KeyedStateBackend.getKeys()` throws Exception.
## Brief change log
- copy the result of `getKeys()` into `list` to avoid concurrency problem.
## Verifying this change
- *add a unit test in `StateBackendTest#testConcurrentModificationWithGetKeys()` to verify this*
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
no
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sihuazhou/flink deletingStateUsingKeyedStateBackendGetKeys
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5751.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5751
----
commit cba3a32f3af16ee92676b1e5b82b21af6fee610d
Author: sihuazhou <su...@...>
Date: 2018-03-23T03:20:42Z
fix concurrency risk in HeapKeyedStateBackend#getKeys().
----
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5751
@kl0u Got it! Addressing...
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5751
Hi @kl0u I changed a bit of the implementation of JIRA, instead of implement multi wrapper classes for different `State`, I introduce a `StateInvocationHandler` which implemented `InvocationHandler` to delegate the `clear()` method of `State` , could you please have a look at this? and please let me know if you have any advice.
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5751
CC: @aljoscha @StefanRRichter
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5751
Hi @kl0u ! I have changed the code, could you please have a look when you have time?
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5751
Hi @kl0u , I have changed the code to meet the comments on JIRA, but unfortunately, I found some case that could make the situation a bit complex with that architecture, that is maybe the `State.clear()` is not the last invocation on the state, for example:
```java
void process() {
/*stuff before state.clear() */
state.clear()
/*stuff after state.clear()*/
Integer value = state.get(); // this should return the default value of ValueState or null if default value if not setted
state.update(1); // this will re-add the state
}
```
So, if we want to wrapper the state, we need do some works on the almost state's almost every api, not only the `clear()`, and the `list` that was used to store the cleared `keys` now has to be changed to `set` (because we need check whether a `key` has been cleared and also maybe remove it from the cleared keyset). I think this maybe a bit expensive, now I am a bit like to go back the simplest way to store the `keys` into a `list`, and loop the list to perform `process()`, what do you think?
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/5751
Hi @sihuazhou .
I see what you are saying.
Let me think about it and I will get back to you probably tomorrow.
---
[GitHub] flink pull request #5751: [FLINK-9060][state] Deleting state using KeyedStat...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5751
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/5751
Hi @sihuazhou ! I think that for now materializing the list of keys and then passing it to the `process` is the best solution. But keep in mind that this is only for the `HeapKeyedStateBackend` and not for RocksDB so the method should create the copy only there.
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5751
@kl0u Got it! Addressing ...
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/5751
There is a discussion on the architecture of the solution in the corresponding JIRA https://issues.apache.org/jira/browse/FLINK-9060
---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5751
Hi @kl0u , do you have any idea now? Or we can just buffer the keys in to a `list` in `applyToAllKeys()`, I think it also seems not like a bad choice. :)
---