You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2020/12/09 09:23:00 UTC

[jira] [Comment Edited] (FLINK-20491) Support Broadcast State in BATCH execution mode

    [ https://issues.apache.org/jira/browse/FLINK-20491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246383#comment-17246383 ] 

Dawid Wysakowicz edited comment on FLINK-20491 at 12/9/20, 9:22 AM:
--------------------------------------------------------------------

Hey [~aljoscha]
Thanks for starting the work on this. Overall I think the approach with consuming non-keyed inputs before the keyed ones is good. I have a couple of questions/concerns to the current PoC, that thought we should solve first before diving onto the review:
* Do we need to sort the "broadcast" side? We have no keys, so the sorting is not strictly necessary for correctness. On the other hand we won't have the nice characteristic that events arrive in the timestamp order.
* For multi input operators, if we do sort the non-keyed inputs, we might consume them intermittently, to sort them according the timestamp. Is that desired?
* We are introducing a limitation that no user key can be serialized as an empty array, otherwise a keyed stream with such a key might be consumed before the non-keyed input.
* I think the trick with an empty byte array might have problems if the key uses a serializer with a fixed length representation. Have you tried it e.g. with Integer?  This is probably just a problem of the implementation.
* Have you thought of completely separating the non-keyed inputs, not sorting them and consuming them first? We could force that in the InputSelector and CommonContext. We could ensure a non-keyed stream is consumed entirely before going to the next one. We would not introduce a special meaning for an empty byte array. Unfortunately we would not have it sorted based on timestamp, it would require slightly more changes in the DataInput.
* As for the changes in BatchExecutionKeyedStateBackend theoretically you can use this functions in the keyed side as well. Simply doing nothing/returning empty results is wrong in that context.


was (Author: dawidwys):
Hey [~aljoscha]
Thanks for starting the work on this. Overall I think the approach with consuming non-keyed inputs before the keyed ones is good. I have a couple of questions/concerns to the current PoC, that thought we should solve first before diving onto the review:
* Do we need to sort the "broadcast" side? We have no keys, so the sorting is not strictly necessary for correctness. On the other hand we won't have the nice characteristic that events arrive in timestamp order.
* For multi input operators, if we do sort the non-keyed inputs, we might consume them intermittently, to sort them according the timestamp. Is that desired?
* We are introducing a limitation that no user key can be serialized as an empty array, otherwise a keyed stream with such a key might be consumed before the non-keyed input.
* I think the trick with an empty byte array might have problems if the key uses a serializer with a fixed length representation. Have you tried it e.g. with Integer?  This is probably just a problem of the implementation.
* Have you thought of completely separating the non-keyed inputs, not sorting them and consuming them first? We could force that in the InputSelector and CommonContext. We could ensure a non-keyed stream is consumed entirely before going to the next one. We would not introduce a special meaning for an empty byte array. Unfortunately we would not have it sorted based on timestamp, it would require slightly more changes in the DataInput.
* As for the changes in BatchExecutionKeyedStateBackend theoretically you can use this functions in the keyed side as well. Simply doing nothing/returning empty results is wrong in that context.

> Support Broadcast State in BATCH execution mode
> -----------------------------------------------
>
>                 Key: FLINK-20491
>                 URL: https://issues.apache.org/jira/browse/FLINK-20491
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Major
>              Labels: pull-request-available
>
> Right now, we don't support {{DataStream.connect(BroadcastStream)}} in {{BATCH}} execution mode. I believe we can add support for this with not too much work.
> The key insight is that we can process the broadcast side before the non-broadcast side. Initially, we were shying away from this because of concerns about {{ctx.applyToKeyedState()}} which allows the broadcast side of the user function to access/iterate over state from the keyed side. We thought that we couldn't support this. However, since we know that we process the broadcast side first we know that the keyed side will always be empty when doing so. We can thus just make this "keyed iteration" call a no-op, instead of throwing an exception as we do now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)