You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nicholas Walton <nw...@me.com> on 2018/07/17 16:09:20 UTC

Parallel stream partitions

Suppose I have a data stream of tuples <tick: Int, key: Int, Value: Double> with the sequence of ticks being 1,2,3,…. for each separate k.

I understand and keyBy(2) will partition the stream so each partition has the same key in each tuple. I now have a sequence of functions to apply to the streams say f(),g() and h() in that order. 

With parallelism set to 1 then each partition-stream passes through f then g then h (f | g | h) in order of tick.

I want to run each partition-stream in parallel, setting parallelism in the Web GUI. 

My question is how do I ensure  each partition stream passes through a fixed sequence (f | g | h)  rather than if parallelism is p running p instances each of f g & h with no guarantee that each partition-stream flows through a unique set of three instances  in tick-order, especially if p is greater than the largest value of key. 

A typical use case would be to maintain a moving average over each key 



I need to remove the crossover in the middle box, so [1] -> [1] -> [1] and [2] -> [2] -> [2], instead of  [1] -> [1] -> [1 or 2] .

Nick

Re: Parallel stream partitions

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Nick,

What Ken said is correct, but let me add two more things.

1) State
Usually, you only need to partition (keyBy()) the data if you want to
process tuples with the same same key together.
Therefore, it is necessary to hold some tuples or intermediate results
(like partial or running aggregates) in state. Flink is a stateful stream
processor and offers many features around state management.
One of them is keyed state, i.e., state that is maintained per key. When a
function processes a tuple, keyed state is automatically put into the
context of the current key. Because the state is always associated with a
key, it is not a problem that a function instance processes multiple keys.

2) Ordering
In a parallel system it is very expensive to reason about or guarantee
ordering. Flink only ensures that tuples that flow through a partition are
processed in order. However, order across different partitions cannot be
guaranteed. Hence, shuffles (due to keyBy or changed parallelism) can
change the order.

Best,
Fabian

2018-07-18 1:50 GMT+02:00 Ken Krugler <kk...@transpac.com>:

> Hi Nick,
>
> On Jul 17, 2018, at 9:09 AM, Nicholas Walton <nw...@me.com> wrote:
>
> Suppose I have a data stream of tuples <tick: Int, key: Int, Value:
> Double> with the sequence of ticks being 1,2,3,…. for each separate k.
>
> I understand and keyBy(2)
>
>
> I think you want keyBy(1), since it’s 0-based.
>
> will partition the stream so each partition has the same key in each
> tuple.
>
>
> I don’t think that’s exactly correct.
>
> Each tuple with the same key value will be in the same partition. But each
> partition can receive multiple key values, depending on the cardinality of
> the keys, the number of partitions, and how they get hashed.
>
> I now have a sequence of functions to apply to the streams say f(),g() and
> h() in that order.
>
>
> Assuming these functions are all post-partitioning, then I would expect
> all tuples with the same key would be processed by the functions that are
> also running in the same partition.
>
> So .keyBy(1).map(f).map(g).map(h) should partition by the key, and then
> chain the processing of tuples.
>
> — Ken
>
>
> With parallelism set to 1 then each partition-stream passes through f then
> g then h (f | g | h) in order of tick.
>
> I want to run each partition-stream in parallel, setting parallelism in
> the Web GUI.
>
> My question is how do I ensure  each partition stream passes through a
> fixed sequence (f | g | h)  rather than if parallelism is p running p
> instances each of f g & h with no guarantee that each partition-stream
> flows through a unique set of three instances  in tick-order, especially if
> p is greater than the largest value of key.
>
> A typical use case would be to maintain a moving average over each key
>
> <1*Xjd2gfMhYqx0sIvAISR47A.png>
>
> I need to remove the crossover in the middle box, so [1] -> [1] -> [1] and
> [2] -> [2] -> [2], instead of  [1] -> [1] -> [1 or 2] .
>
> Nick
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Re: Parallel stream partitions

Posted by Ken Krugler <kk...@transpac.com>.
Hi Nick,

> On Jul 17, 2018, at 9:09 AM, Nicholas Walton <nwalton@me.com <ma...@me.com>> wrote:
> 
> Suppose I have a data stream of tuples <tick: Int, key: Int, Value: Double> with the sequence of ticks being 1,2,3,…. for each separate k.
> 
> I understand and keyBy(2)

I think you want keyBy(1), since it’s 0-based.

> will partition the stream so each partition has the same key in each tuple.

I don’t think that’s exactly correct.

Each tuple with the same key value will be in the same partition. But each partition can receive multiple key values, depending on the cardinality of the keys, the number of partitions, and how they get hashed.

> I now have a sequence of functions to apply to the streams say f(),g() and h() in that order. 

Assuming these functions are all post-partitioning, then I would expect all tuples with the same key would be processed by the functions that are also running in the same partition.

So .keyBy(1).map(f).map(g).map(h) should partition by the key, and then chain the processing of tuples.

— Ken

> 
> With parallelism set to 1 then each partition-stream passes through f then g then h (f | g | h) in order of tick.
> 
> I want to run each partition-stream in parallel, setting parallelism in the Web GUI. 
> 
> My question is how do I ensure  each partition stream passes through a fixed sequence (f | g | h)  rather than if parallelism is p running p instances each of f g & h with no guarantee that each partition-stream flows through a unique set of three instances  in tick-order, especially if p is greater than the largest value of key. 
> 
> A typical use case would be to maintain a moving average over each key 
> 
> <1*Xjd2gfMhYqx0sIvAISR47A.png>
> 
> I need to remove the crossover in the middle box, so [1] -> [1] -> [1] and [2] -> [2] -> [2], instead of  [1] -> [1] -> [1 or 2] .
> 
> Nick

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra