You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Krzysztof Chmielewski <kr...@gmail.com> on 2022/01/11 12:00:00 UTC

Sorting/grouping keys and State management in BATCH mode

Hi,
Im reading docs and FLIP-140 available for BATCH mode [1][2] where it reads
that
" In BATCH mode, the configured state backend is ignored. Instead, the
input of a keyed operation is grouped by key (using sorting) and then we
process all records of a key in turn."  [1]

I would like to ask:
1. Where (Heap, OffHeap) Flink keeps records for BATCH Streams if the
configured  state backed  is ignored. In FLIP-140 i see there was a new
State implementation created, that is prepared to keep only one key value,
but there is no information "where" regarding memory it is kept.

2. Where Sorting algorithm keeps it intermediate results?
How/Who knows that there will be no more records for given key?

If I get it right, sorting is done through ExternalSorter class. Is there
any documentation or usage example for ExternalSorter and description about
SortStege like READ, SORT, SPILL?

Regards,
Krzysztof Chmielewski


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys

Re: Sorting/grouping keys and State management in BATCH mode

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Krzysztof,

Re 1. I believe you are asking where the state is kept. It is stored in
memory, but bear in mind there is only ever state kept for the current
key. Once all records for a key are processed the corresponding state is
discarded as it won't be needed anymore.

Re 2. The sorting algorithm keeps records in serialized form in the
managed memory of an operator[2]. It potentially spills the intermediate
results to local disks once it reaches the sort spilling threshold[1].

Re 2,5 We know there is no more records for a given key once we receive
a record with a key different from the previous one after sorting.
(Sorting is applied on keys, it's more of a grouping, than really
sorting). This is leveraged e.g. in the
BatchExecutionKeyedStateBackend#setCurrentKey.

ExternalSorter is not a public class and thus there is no usage
examples, nor user facing documentation. Unfortunately best you can get
is the javadocs/comments in the class itself.

Best,

Dawid

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-runtime-sort-spilling-threshold

[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#configure-heap-and-managed-memory


On 11/01/2022 17:07, Chesnay Schepler wrote:
> Looping in Dawid who can hopefully answer your questions.
>
> On 11/01/2022 13:00, Krzysztof Chmielewski wrote:
>> Hi,
>> Im reading docs and FLIP-140 available for BATCH mode [1][2] where it
>> reads that 
>> " In |BATCH| mode, the configured state backend is ignored. Instead,
>> the input of a keyed operation is grouped by key (using sorting) and
>> then we process all records of a key in turn."  [1]
>>
>> I would like to ask:
>> 1. Where (Heap, OffHeap) Flink keeps records for BATCH Streams if the
>> configured  state backed  is ignored. In FLIP-140 i see there was a
>> new State implementation created, that is prepared to keep only one
>> key value, but there is no information "where" regarding memory it is
>> kept. 
>>
>> 2. Where Sorting algorithm keeps it intermediate results?
>> How/Who knows that there will be no more records for given key?
>>
>> If I get it right, sorting is done through ExternalSorter class. Is
>> there any documentation or usage example for ExternalSorter and
>> description about SortStege like READ, SORT, SPILL?
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>>
>> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>> [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys
>
>

Re: Sorting/grouping keys and State management in BATCH mode

Posted by Chesnay Schepler <ch...@apache.org>.
Looping in Dawid who can hopefully answer your questions.

On 11/01/2022 13:00, Krzysztof Chmielewski wrote:
> Hi,
> Im reading docs and FLIP-140 available for BATCH mode [1][2] where it 
> reads that
> " In |BATCH| mode, the configured state backend is ignored. Instead, 
> the input of a keyed operation is grouped by key (using sorting) and 
> then we process all records of a key in turn."  [1]
>
> I would like to ask:
> 1. Where (Heap, OffHeap) Flink keeps records for BATCH Streams if the 
> configured  state backed  is ignored. In FLIP-140 i see there was a 
> new State implementation created, that is prepared to keep only one 
> key value, but there is no information "where" regarding memory it is 
> kept.
>
> 2. Where Sorting algorithm keeps it intermediate results?
> How/Who knows that there will be no more records for given key?
>
> If I get it right, sorting is done through ExternalSorter class. Is 
> there any documentation or usage example for ExternalSorter and 
> description about SortStege like READ, SORT, SPILL?
>
> Regards,
> Krzysztof Chmielewski
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys