You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 刘 文 <th...@yahoo.com> on 2019/03/03 13:15:20 UTC

[Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs to?

). Environment Flink1.7.2 WordCount local, stream processing
).source RecordWriter.emit(), for each element by key, divided into different partitions, the partition location of each element has been determined, the number of partitions is determined by DataStream.setParallelism(2)
 ). By copyFromSerializerToTargetChannel(int targetChannel) to write data to different channels, it is to send data to the window corresponding to different partitions (data is sent one by one)

Re: [Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs to?

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

I’m not if I understand your question/concerns.

As Rong Rong explained, key selector is used to assign records to window operators. 

Within key context, you do not have access to other keys/values in your operator/functions, so your reduce/process/… functions when processing key:1 won’t be able to access/see keys 2, 3, 4, 6 or 10, even if they are on the same machine. If you want to process records together, they must be keyed by together appropriately. 

Piotrek

> On 4 Mar 2019, at 04:45, 刘 文 <th...@yahoo.com> wrote:
> 
> 
> Sorry, I still don't understand. Can I ask for help again?
> 
> 
> For example, the degree of parallelism is 2, which will produce two Window threads.
> ).setParallelism(2)
> ).These two windows are how to read their own partition data.
> ).input data
>   1 2 3 4 5 6 7 8 9 10
> ).source   ->  operator   ->   RecordWriter.emit    cal  partition by key,
>     ------------------
>     change [partition 0]
>    
>    
> 		key:1    partition:0
> 		key:2    partition:0
> 		key:3    partition:0
> 		key:4    partition:0
> 		key:6    partition:0
> 		key:10   partition:0
> 		 ------------------
> 		 change 1  [partition 1]
> 		
> 		key:5    partition:1
> 		key:7    partition:1
> 		key:8    partition:1
> 		key:9    partition:1
> ).window 0 (1/2)
>    How to Calculation current parition  ?
>     How to get the data in the current partition  ?
>    
> ).window 1 (2/2)		
>    How to Calculation current parition  ? 
>     How to get the data in the current partition ?
> 
> ---------------------------------------------------
> 
>> 在 2019年3月4日,上午4:19,Rong Rong <walterddr@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi
>> 
>> I am not sure if I understand your question correctly, so will try to explain the flow how elements gets into window operators.
>> 
>> Flink makes the partition assignment before invoking the operator to process element. For the word count example, WindowOperator is invoked by StreamInputProcessor[1] to "setKeyContextElement".
>> The actual key is then set by WindowOperator (inherently by AbstractStreamOperator[2]), which ultimately passed to KeyedStateBackend[3].
>> 
>> So, by the time WindowOperator processes elements, the KeyedStateBackend was already set to the correct key.
>> 
>> Hope this answers your question.
>> 
>> --
>> Rong
>> 
>> 
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html>
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html>
>> On Sun, Mar 3, 2019 at 5:15 AM 刘 文 <thinktothings@yahoo.com <ma...@yahoo.com>> wrote:
>> ). Environment Flink1.7.2 WordCount local, stream processing
>> ).source RecordWriter.emit(), for each element by key, divided into different partitions, the partition location of each element has been determined, the number of partitions is determined by DataStream.setParallelism(2)
>>  ). By copyFromSerializerToTargetChannel(int targetChannel) to write data to different channels, it is to send data to the window corresponding to different partitions (data is sent one by one)
> 


Re: [Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs to?

Posted by 刘 文 <th...@yahoo.com>.
Sorry, I still don't understand. Can I ask for help again?


For example, the degree of parallelism is 2, which will produce two Window threads.
).setParallelism(2)
).These two windows are how to read their own partition data.
).input data
  1 2 3 4 5 6 7 8 9 10
).source   ->  operator   ->   RecordWriter.emit    cal  partition by key,
    ------------------
    change [partition 0]
   
   
		key:1    partition:0
		key:2    partition:0
		key:3    partition:0
		key:4    partition:0
		key:6    partition:0
		key:10   partition:0
		 ------------------
		 change 1  [partition 1]
		
		key:5    partition:1
		key:7    partition:1
		key:8    partition:1
		key:9    partition:1
).window 0 (1/2)
   How to Calculation current parition  ?
    How to get the data in the current partition  ?
   
).window 1 (2/2)		
   How to Calculation current parition  ? 
    How to get the data in the current partition ?

---------------------------------------------------

> 在 2019年3月4日,上午4:19,Rong Rong <wa...@gmail.com> 写道:
> 
> Hi
> 
> I am not sure if I understand your question correctly, so will try to explain the flow how elements gets into window operators.
> 
> Flink makes the partition assignment before invoking the operator to process element. For the word count example, WindowOperator is invoked by StreamInputProcessor[1] to "setKeyContextElement".
> The actual key is then set by WindowOperator (inherently by AbstractStreamOperator[2]), which ultimately passed to KeyedStateBackend[3].
> 
> So, by the time WindowOperator processes elements, the KeyedStateBackend was already set to the correct key.
> 
> Hope this answers your question.
> 
> --
> Rong
> 
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html>
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html>
> On Sun, Mar 3, 2019 at 5:15 AM 刘 文 <thinktothings@yahoo.com <ma...@yahoo.com>> wrote:
> ). Environment Flink1.7.2 WordCount local, stream processing
> ).source RecordWriter.emit(), for each element by key, divided into different partitions, the partition location of each element has been determined, the number of partitions is determined by DataStream.setParallelism(2)
>  ). By copyFromSerializerToTargetChannel(int targetChannel) to write data to different channels, it is to send data to the window corresponding to different partitions (data is sent one by one)


Re: [Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs to?

Posted by Rong Rong <wa...@gmail.com>.
Hi

I am not sure if I understand your question correctly, so will try to
explain the flow how elements gets into window operators.

Flink makes the partition assignment before invoking the operator to
process element. For the word count example, WindowOperator is invoked by
StreamInputProcessor[1] to "setKeyContextElement".
The actual key is then set by WindowOperator (inherently by
AbstractStreamOperator[2]), which ultimately passed to KeyedStateBackend[3].

So, by the time WindowOperator processes elements, the KeyedStateBackend
was already set to the correct key.

Hope this answers your question.

--
Rong


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html

On Sun, Mar 3, 2019 at 5:15 AM 刘 文 <th...@yahoo.com> wrote:

> ). Environment Flink1.7.2 WordCount local, stream processing
> ).source RecordWriter.emit(), for each element by key, divided into
> different partitions, the partition location of each element has been
> determined, the number of partitions is determined by
> DataStream.setParallelism(2)
>  ). By copyFromSerializerToTargetChannel(int targetChannel) to write data
> to different channels, it is to send data to the window corresponding to
> different partitions (data is sent one by one)