You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by KristoffSC <kr...@gmail.com> on 2020/01/28 14:50:54 UTC

Reinterpreting a pre-partitioned data stream as keyed stream

Hi all,
we have a use case where order of received events matters and it should be
kept across pipeline.

Our pipeline would be paralleled. We can key the stream just after Source
operator, but in order to keep the ordering among next operators we would
have to still keep the stream keyed. 

Obviously we could key again and again but this would cause some performance
penalty.
We were thinking about using DataStreamUtils.reinterpretAsKeyedStream
instead.

Since this is an experimental functionality I would like to ask if there is
someone among the community that is using this feature? Do we know about any
open issues regarding this feature?

Thanks,
Krzysztof








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Reinterpreting a pre-partitioned data stream as keyed stream

Posted by KristoffSC <kr...@gmail.com>.
Hi, 
sorry for a long wait. 

Answering our questions: 
1 - yes
2 - thx
3 - rigth, understood
4 - well, in general I want to understand how this works. To be able in
future to modify my job, for example extracting cpu heavy operators to
separate tasks. Actually in my job some of my operators are chained and some
of them are not, depending on the logic they are executing. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Reinterpreting a pre-partitioned data stream as keyed stream

Posted by Guowei Ma <gu...@gmail.com>.
Hi,

1. Is the key that is used by the keyBy after point 1 precisely the same as
the key used by the 4a and 4b? If yes, I think you could use
the reinterpretAsKeyedStream to avoid the shuffle.
2. You could use SingleOutputStreamOperator::setChainingStrategy to disable
the chain or use rebalance/shuffle between the two operators if you don't
care about the order.
3. If you use the  unorderedWait the order would not be preserved even if
you use keyBy after point1
4. BTW why do you not want the operator chain together?

Best,
Guowei


KristoffSC <kr...@gmail.com> 于2020年1月30日周四 下午7:54写道:

> Hi,
> thank you for the answer.
>
> I think I understand.
>
> In my uses case I have to keep the order of events for each key, but I dont
> have to process keys in the same order that I received them. On one point
> of
> my pipeline I'm also using a SessionWindow.
>
> My Flink environment has operator chaining enabled. I woudl say, that some
> of my operators can be chained.
>
> My pipeline is (each point is an operator after Flink's operator chainign
> mechanism)
> 1. ActiveMQ connector + mapper, all with parallelism 1 (btw I'm using a
> org.apache.bahir connector for Active MQ which does not support parallelism
> bigger than 1)
> 2. Enrichment, where Im using AsyncDataStream.unorderedWait with
> parallelism
> 5.
> 3. Event split based on some criteria (not key by) that dispatches my
> stream
> into two "sub streams"
> 4. Both substreams are keyed
> 4a. SubStream "A" has a session window applied - parallelism 6.
> 4b. Substream "B" has no windowing, no aggregation, but has a business
> logic
> for witch order of events matters. - parallelism 6
> 5. Sink for both streams.
>
>
> If I understand you and documentation correctly, Redistributing will
> forward
> messages keeping the order for a key, but events between keys can be
> delivered in a different order.
> "So in this example, the ordering within each key is preserved, but the
> parallelism does introduce non-determinism regarding the order in which the
> aggregated results for different keys arrive at the sink."
>
> Then I could use a keyBy at the pipeline beginning, just after point 1.
> But to use Window in point 4a and my process function in 4b I need to have
> a
> keyedStream. I'm using a KeyedProcessFunction there. What my options with
> this?
>
>
> P.S.
> Regarding the operator chaining, I'm aware that there is an API that allows
> me to model which operators should be chained theatergoer and which not
> even
> if they have the same parallelism level.
>
> Thanks,
> Krzysztof
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Reinterpreting a pre-partitioned data stream as keyed stream

Posted by KristoffSC <kr...@gmail.com>.
Hi,
thank you for the answer. 

I think I understand. 

In my uses case I have to keep the order of events for each key, but I dont
have to process keys in the same order that I received them. On one point of
my pipeline I'm also using a SessionWindow.

My Flink environment has operator chaining enabled. I woudl say, that some
of my operators can be chained. 

My pipeline is (each point is an operator after Flink's operator chainign
mechanism)
1. ActiveMQ connector + mapper, all with parallelism 1 (btw I'm using a
org.apache.bahir connector for Active MQ which does not support parallelism
bigger than 1)
2. Enrichment, where Im using AsyncDataStream.unorderedWait with parallelism
5.
3. Event split based on some criteria (not key by) that dispatches my stream
into two "sub streams"
4. Both substreams are keyed
4a. SubStream "A" has a session window applied - parallelism 6.
4b. Substream "B" has no windowing, no aggregation, but has a business logic
for witch order of events matters. - parallelism 6
5. Sink for both streams.


If I understand you and documentation correctly, Redistributing will forward
messages keeping the order for a key, but events between keys can be
delivered in a different order. 
"So in this example, the ordering within each key is preserved, but the
parallelism does introduce non-determinism regarding the order in which the
aggregated results for different keys arrive at the sink."

Then I could use a keyBy at the pipeline beginning, just after point 1.
But to use Window in point 4a and my process function in 4b I need to have a
keyedStream. I'm using a KeyedProcessFunction there. What my options with
this?


P.S.
Regarding the operator chaining, I'm aware that there is an API that allows
me to model which operators should be chained theatergoer and which not even
if they have the same parallelism level.

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Reinterpreting a pre-partitioned data stream as keyed stream

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Krzysztof


When you use the *reinterpretAsKeyedStream* you must guarantee that
partition is the same as Flink does by yourself. But before going any
further I think we should know whether normal DataStream API could satisfy
your requirements without using *reinterpretAsKeyedStream.*


An operator could send its output to another operator in two ways:
one-to-one(forward) or redistributing[1]. In one-to-one(forward) the
partition and order of the event would keep the same in the two operators.
Two operators would use the forward by default if the parallelism of two
operator is same.

Without the total details I think maybe you could just *keyby* once if your
job does not have special needs. Or you could share the what your job looks
like if it is convenient.



[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html#parallel-dataflows

Best,
Guowei


KristoffSC <kr...@gmail.com> 于2020年1月28日周二 下午10:47写道:

> Hi all,
> we have a use case where order of received events matters and it should be
> kept across pipeline.
>
> Our pipeline would be paralleled. We can key the stream just after Source
> operator, but in order to keep the ordering among next operators we would
> have to still keep the stream keyed.
>
> Obviously we could key again and again but this would cause some
> performance
> penalty.
> We were thinking about using DataStreamUtils.reinterpretAsKeyedStream
> instead.
>
> Since this is an experimental functionality I would like to ask if there is
> someone among the community that is using this feature? Do we know about
> any
> open issues regarding this feature?
>
> Thanks,
> Krzysztof
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>