You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kostas Kloudas (JIRA)" <ji...@apache.org> on 2018/09/14 10:10:00 UTC

[jira] [Comment Edited] (FLINK-10343) Expose setCurrentKey method to streamRuntimeContext

    [ https://issues.apache.org/jira/browse/FLINK-10343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614544#comment-16614544 ] 

Kostas Kloudas edited comment on FLINK-10343 at 9/14/18 10:09 AM:
------------------------------------------------------------------

Hi [~aitozi]! 

This functionality is intentionally not exposed at the level of abstraction of a function. The reason is that if someone wants to go that "deep", then the right approach
is to implement your own operator and manually take care of the issues associated with caching, consistency, memory allocation (so that you do not blow up the VM etc).
At the operator level, you have access to all the state related methods.

In addition, if I am not mistaken ([~stefanrichter83@gmail.com] and [~azagrebin] may know more) this would require deeper changes in the way snapshot methods are called. This 
is because keyed state is written before the function state. In other words, the keyed state is written and then the function.snapshotState() is called, so the modifications from 
the function would end up in the state being written in the "next" checkpoint.

Given the above, i.e. that the required changes are more than meets the eye and that this functionality is already exposed at the operator level, I would suggest to *not* go for it.

Let me know if you agree.


was (Author: kkl0u):
Hi [~aitozi]! 

This functionality is intentionally not exposed at the level of abstraction of a function. The reason is that if someone wants to go that "deep", then the right approach
is to implement your own operator and manually take care of the issues associated with caching, consistency, memory allocation (so that you do not blow up the VM etc).
At the operator level, you have access to all the state related methods.

In addition, if I am not mistaken ([~stefanrichter83@gmail.com] and [~azagrebin] may know more) this would require deeper changes in the way snapshots are written. This 
is because keyed state is written before the function state. In other words, the keyed state is written and then the function.snapshotState() is called, so the modifications from 
the function would end up in the state being written in the "next" checkpoint. Finally, changes to the order of writing the state may lead to backwards compatibility issues.

Given the above, i.e. that the required changes are more than meets the eye and potentially backwards-compatibility breaking and that this functionality is already exposed
at the operator level, I would suggest to *not* go for it.

Let me know if you agree.

> Expose setCurrentKey method to streamRuntimeContext
> ---------------------------------------------------
>
>                 Key: FLINK-10343
>                 URL: https://issues.apache.org/jira/browse/FLINK-10343
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.7.0
>            Reporter: aitozi
>            Assignee: aitozi
>            Priority: Major
>             Fix For: 1.7.0
>
>
> when we use reducing state / aggregating keyed state and so on , we have to read value from state backend and update the value with userFunction and then put back to state backend. If we can just cache certain data in heap with a map, and update once in snapshot method with 
> {code:java}
> snapshot() {
>  for(Map.Entry<String, String> entry : map.entrySet()){
>      setCurrentKey(entry.getKey());
>      valueState.update(entry.getValue()); // put value back to state backend
> }}
> {code}
> we just have to expose the setCurrentKey to userFunction and the will enable the ability to cache partitial keyedState in memory by userself.
> what's your opinion [~stefanrichter83@gmail.com] [~azagrebin] ? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)