You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tommy May <tv...@gmail.com> on 2023/03/03 22:56:39 UTC

Avoiding data shuffling when reading pre-partitioned data from Kafka

Hello,

My team has a Flink streaming job that does a stateful join across two high
throughput kafka topics. This results in a large amount of data ser/de and
shuffling (about 1gb/s for context). We're running into a bottleneck on
this shuffling step. We've attempted to optimize our flink configuration,
join logic, scale out the kafka topics & flink job, and speed up state
access. What we see is that the join step causes backpressure on the kafka
sources and lag slowly starts to accumulate.

One idea we had to optimize this is to pre-partition the data in kafka on
the same key that the join is happening on. This'll effectively reduce data
shuffling to 0 and remove the bottleneck that we're seeing. I've done some
research into the topic and from what I understand this is not
straightforward to take advantage of in Flink. It looks to be a fairly
commonly requested feature based on the many StackOverflow posts and slack
questions, and I noticed there is FLIP-186 which attempts to address this
topic as well.

Are there any upcoming plans to add this feature to a future Flink release?
I believe it'd be super impactful for similar large scale jobs out there.
I'd be interested in helping as well, but admittedly I'm relatively new to
Flink.  I poked around the code a bit, and it certainly did not seem like a
straightforward addition, so it may be best handled by someone with more
internal knowledge.

Thanks,
Tommy

Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

Posted by David Morávek <dm...@apache.org>.
> That comes with the additional constraints that Ken mentioned, correct?
It could break immediately in cases if a key comes through on a different
partition, or if the number of partitions happen to change? I'm concerned
about that for our use case as we don't have 100% control of the upstream
data source.

There is no way to avoid shuffling if the partitioning is unstable (except
for map-side join, which is a special case in case one side of the join
fits in memory).

Best,
D.

On Tue, Mar 7, 2023 at 9:16 AM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Hi Tommy,
>
>
>
> While not coming up with a sure solution, I’ve got a number of idea on how
> to continue and shed light into the matter:
>
>
>
>    - With respect to diagnostics, have you enabled flame graph
>    (cluster-config.rest.flamegraph.enabled),
>       - It allows you to see the call tree of each task and where
>       dominantly time is spent
>       - That usually gives me quite some insight
>    - You mention serialization could be a problem:
>       - Which serialization are you using currently?
>       - I could imagine to use one the (almost) zero-copy type like
>       RowData
>          - I considered this once but didn’t try
>       - Nico published a nice comparison of the choices w/r to
>       serializers [1]
>    - Just for completeness: pipeline.object-reuse can cut down quite a
>    bit on GC cost adding the need to execute more discipline with object
>    mutation and caching un-serialized objects in arbitrary data structures
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> [1]
> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/
>
>
>
>
>
>
>
> *From:* Tommy May <tv...@gmail.com>
> *Sent:* Tuesday, March 7, 2023 3:25 AM
> *To:* David Morávek <dm...@apache.org>
> *Cc:* Ken Krugler <kk...@transpac.com>; Flink User List <
> user@flink.apache.org>
> *Subject:* Re: Avoiding data shuffling when reading pre-partitioned data
> from Kafka
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Ken & David,
>
>
>
> Thanks for following up. I've responded to your questions below.
>
>
>
>  If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)
>
>
>
> Unfortunately the number of keys in our case is huge, they're unique per
> handful of events.
>
>
>
> If your data are already pre-partitioned and the partitioning matches
> (hash partitioning on the JAVA representation of the key yielded by the
> KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
> shuffle.
>
>
>
> That comes with the additional constraints that Ken mentioned, correct? It
> could break immediately in cases if a key comes through on a different
> partition, or if the number of partitions happen to change? I'm concerned
> about that for our use case as we don't have 100% control of the upstream
> data source.
>
>
>
> I feel you'd be blocked by the state access downstream (with RocksDB). Are
> you sure it isn't the case?
>
>
>
> Yes, you are right that state access is also a limiting factor and some
> optimizations to limit that have helped quite a bit (both in our
> implementation and in using local SSDs for rocksdb). One other path we
> looked at is using memory-backed volumes for rocksdb, but ran into a
> limitation that we cannot configure Flink's process memory lower than the
> k8s container memory, leading to OOMs. More details at
> https://stackoverflow.com/questions/74118022/flink-pods-ooming-using-memory-backed-volume-with-k8s-operator
> .
>
>
>
> I don't have a dashboard currently to immediately point to data shuffling
> as the primary bottleneck, but I thought it could be a huge optimization if
> we can tell Flink to take advantage of the pre-partitioned datasource,
> given we're shuffling near 1 Gb/sec right now. I can see that the join is
> causing the backpressure on the sources though, and figured that network
> and state acces would be the two primary contributors there. Let me know if
> you have any good debugging tools to narrow in on this more.
>
>
>
> Thanks,
>
> Tommy
>
>
>
>
>
> On Mon, Mar 6, 2023 at 4:42 AM David Morávek <dm...@apache.org> wrote:
>
> Using an operator state for a stateful join isn't great because it's meant
> to hold only a minimal state related to the operator (e.g., partition
> tracking).
>
>
>
> If your data are already pre-partitioned and the partitioning matches
> (hash partitioning on the JAVA representation of the key yielded by the
> KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
> shuffle.
>
>
>
> > What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
>
>
>
> I feel you'd be blocked by the state access downstream (with RocksDB). Are
> you sure it isn't the case?
>
>
>
> [1]
> https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector-
>
>
>
> Best,
>
> D.
>
>
>
> On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler <kk...@transpac.com>
> wrote:
>
> Hi Tommy,
>
>
>
> To use stateful timers, you need to have a keyed stream, which gets tricky
> when you’re trying to avoid network traffic caused by the keyBy()
>
>
>
> If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)
>
>
>
> It’s possible to calculate a composite key, based on the “real” key and a
> synthetic value, that will wind up on in the same slot where you’re doing
> this calculation.
>
>
>
> So that would let you create a keyed stream which would have
> serialization/deserialization cost, but wouldn’t actually go through the
> network stack.
>
>
>
> Since the composite key generation is deterministic, you can do the same
> thing on both streams, and join on the composite key.
>
>
>
> You’d want to cache the mapping from the real key to the synthetic value,
> to avoid doing this calculation for every record.
>
>
>
> If that sounds promising, lmk and I can post some code.
>
>
>
> — Ken
>
>
>
>
>
> On Mar 4, 2023, at 12:37 PM, Tommy May <tv...@gmail.com> wrote:
>
>
>
> Hello Ken,
>
>
>
> Thanks for the quick response! That is an interesting workaround. In our
> case though we are using a CoProcessFunction with stateful timers. Is there
> a similar workaround path available in that case? The one possible way I
> could find required partitioning data in kafka in a very specific way
> to match what Flink's keyBy is doing, and that it'd have additional
> constraints to the method you described that would be difficult to handle
> in a prod environment where we don't have full control over the producers &
> input topics.
>
>
>
> Regarding the addition of a more flexible way to take advantage of
> pre-partitioned sources like in FLIP-186, would you suggest I forward this
> chain over to the dev Flink mailing list?
>
>
>
> Thanks,
>
> Tommy
>
>
>
>
>
>
>
> On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kk...@transpac.com>
> wrote:
>
> Hi Tommy,
>
>
>
> I believe there is a way to make this work currently, but with lots of
> caveats and constraints.
>
>
>
> This assumes you want to avoid any network shuffle.
>
>
>
> 1. Both topics have names that return the same value for
> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
>
> 2. Both topics have the same number of partitions.
>
> 3. The parallelism of your join function exactly matches the number of
> partitions.
>
> 4. You can’t change any of the above without losing state.
>
> 5. You don’t need stateful timers.
>
>
>
> If the above is true, then you could use a CoFlatMapFunction and operator
> state to implement a stateful join.
>
>
>
> If it’s something like a left outer join without any state TTL or need to
> keep both sides in state, then it’s pretty easy.
>
>
>
> — Ken
>
>
>
> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic
> name to get the hashCode() result you need.
>
>
>
> On Mar 3, 2023, at 4:56 PM, Tommy May <tv...@gmail.com> wrote:
>
>
>
> Hello,
>
>
>
> My team has a Flink streaming job that does a stateful join across two
> high throughput kafka topics. This results in a large amount of data ser/de
> and shuffling (about 1gb/s for context). We're running into a bottleneck on
> this shuffling step. We've attempted to optimize our flink configuration,
> join logic, scale out the kafka topics & flink job, and speed up state
> access. What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
>
>
>
> One idea we had to optimize this is to pre-partition the data in kafka on
> the same key that the join is happening on. This'll effectively reduce data
> shuffling to 0 and remove the bottleneck that we're seeing. I've done some
> research into the topic and from what I understand this is not
> straightforward to take advantage of in Flink. It looks to be a fairly
> commonly requested feature based on the many StackOverflow posts and slack
> questions, and I noticed there is FLIP-186 which attempts to address this
> topic as well.
>
>
>
> Are there any upcoming plans to add this feature to a future Flink
> release? I believe it'd be super impactful for similar large scale jobs out
> there. I'd be interested in helping as well, but admittedly I'm relatively
> new to Flink.  I poked around the code a bit, and it certainly did not seem
> like a straightforward addition, so it may be best handled by someone with
> more internal knowledge.
>
>
>
> Thanks,
>
> Tommy
>
>
>
> --------------------------
>
> Ken Krugler
>
> http://www.scaleunlimited.com
>
> Custom big data solutions
>
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>
>
>
>
>
>
> --------------------------
>
> Ken Krugler
>
> http://www.scaleunlimited.com
>
> Custom big data solutions
>
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Avoiding data shuffling when reading pre-partitioned data from Kafka

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Tommy,

While not coming up with a sure solution, I’ve got a number of idea on how to continue and shed light into the matter:


  *   With respect to diagnostics, have you enabled flame graph (cluster-config.rest.flamegraph.enabled),
     *   It allows you to see the call tree of each task and where dominantly time is spent
     *   That usually gives me quite some insight
  *   You mention serialization could be a problem:
     *   Which serialization are you using currently?
     *   I could imagine to use one the (almost) zero-copy type like RowData
        *   I considered this once but didn’t try
     *   Nico published a nice comparison of the choices w/r to serializers [1]
  *   Just for completeness: pipeline.object-reuse can cut down quite a bit on GC cost adding the need to execute more discipline with object mutation and caching un-serialized objects in arbitrary data structures

Hope this helps

Thias




[1] https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/




From: Tommy May <tv...@gmail.com>
Sent: Tuesday, March 7, 2023 3:25 AM
To: David Morávek <dm...@apache.org>
Cc: Ken Krugler <kk...@transpac.com>; Flink User List <us...@flink.apache.org>
Subject: Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Ken & David,

Thanks for following up. I've responded to your questions below.

 If the number of unique keys isn’t huge, I could think of yet another helicopter stunt that you could try :)

Unfortunately the number of keys in our case is huge, they're unique per handful of events.

If your data are already pre-partitioned and the partitioning matches (hash partitioning on the JAVA representation of the key yielded by the KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the shuffle.

That comes with the additional constraints that Ken mentioned, correct? It could break immediately in cases if a key comes through on a different partition, or if the number of partitions happen to change? I'm concerned about that for our use case as we don't have 100% control of the upstream data source.

I feel you'd be blocked by the state access downstream (with RocksDB). Are you sure it isn't the case?

Yes, you are right that state access is also a limiting factor and some optimizations to limit that have helped quite a bit (both in our implementation and in using local SSDs for rocksdb). One other path we looked at is using memory-backed volumes for rocksdb, but ran into a limitation that we cannot configure Flink's process memory lower than the k8s container memory, leading to OOMs. More details at https://stackoverflow.com/questions/74118022/flink-pods-ooming-using-memory-backed-volume-with-k8s-operator.

I don't have a dashboard currently to immediately point to data shuffling as the primary bottleneck, but I thought it could be a huge optimization if we can tell Flink to take advantage of the pre-partitioned datasource, given we're shuffling near 1 Gb/sec right now. I can see that the join is causing the backpressure on the sources though, and figured that network and state acces would be the two primary contributors there. Let me know if you have any good debugging tools to narrow in on this more.

Thanks,
Tommy


On Mon, Mar 6, 2023 at 4:42 AM David Morávek <dm...@apache.org>> wrote:
Using an operator state for a stateful join isn't great because it's meant to hold only a minimal state related to the operator (e.g., partition tracking).

If your data are already pre-partitioned and the partitioning matches (hash partitioning on the JAVA representation of the key yielded by the KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the shuffle.

> What we see is that the join step causes backpressure on the kafka sources and lag slowly starts to accumulate.

I feel you'd be blocked by the state access downstream (with RocksDB). Are you sure it isn't the case?

[1] https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector-

Best,
D.

On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler <kk...@transpac.com>> wrote:
Hi Tommy,

To use stateful timers, you need to have a keyed stream, which gets tricky when you’re trying to avoid network traffic caused by the keyBy()

If the number of unique keys isn’t huge, I could think of yet another helicopter stunt that you could try :)

It’s possible to calculate a composite key, based on the “real” key and a synthetic value, that will wind up on in the same slot where you’re doing this calculation.

So that would let you create a keyed stream which would have serialization/deserialization cost, but wouldn’t actually go through the network stack.

Since the composite key generation is deterministic, you can do the same thing on both streams, and join on the composite key.

You’d want to cache the mapping from the real key to the synthetic value, to avoid doing this calculation for every record.

If that sounds promising, lmk and I can post some code.

— Ken


On Mar 4, 2023, at 12:37 PM, Tommy May <tv...@gmail.com>> wrote:

Hello Ken,

Thanks for the quick response! That is an interesting workaround. In our case though we are using a CoProcessFunction with stateful timers. Is there a similar workaround path available in that case? The one possible way I could find required partitioning data in kafka in a very specific way to match what Flink's keyBy is doing, and that it'd have additional constraints to the method you described that would be difficult to handle in a prod environment where we don't have full control over the producers & input topics.

Regarding the addition of a more flexible way to take advantage of pre-partitioned sources like in FLIP-186, would you suggest I forward this chain over to the dev Flink mailing list?

Thanks,
Tommy



On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kk...@transpac.com>> wrote:
Hi Tommy,

I believe there is a way to make this work currently, but with lots of caveats and constraints.

This assumes you want to avoid any network shuffle.

1. Both topics have names that return the same value for ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
2. Both topics have the same number of partitions.
3. The parallelism of your join function exactly matches the number of partitions.
4. You can’t change any of the above without losing state.
5. You don’t need stateful timers.

If the above is true, then you could use a CoFlatMapFunction and operator state to implement a stateful join.

If it’s something like a left outer join without any state TTL or need to keep both sides in state, then it’s pretty easy.

— Ken

PS - it’s pretty easy to figure out a “-xxx” value to append to a topic name to get the hashCode() result you need.


On Mar 3, 2023, at 4:56 PM, Tommy May <tv...@gmail.com>> wrote:

Hello,

My team has a Flink streaming job that does a stateful join across two high throughput kafka topics. This results in a large amount of data ser/de and shuffling (about 1gb/s for context). We're running into a bottleneck on this shuffling step. We've attempted to optimize our flink configuration, join logic, scale out the kafka topics & flink job, and speed up state access. What we see is that the join step causes backpressure on the kafka sources and lag slowly starts to accumulate.

One idea we had to optimize this is to pre-partition the data in kafka on the same key that the join is happening on. This'll effectively reduce data shuffling to 0 and remove the bottleneck that we're seeing. I've done some research into the topic and from what I understand this is not straightforward to take advantage of in Flink. It looks to be a fairly commonly requested feature based on the many StackOverflow posts and slack questions, and I noticed there is FLIP-186 which attempts to address this topic as well.

Are there any upcoming plans to add this feature to a future Flink release? I believe it'd be super impactful for similar large scale jobs out there. I'd be interested in helping as well, but admittedly I'm relatively new to Flink.  I poked around the code a bit, and it certainly did not seem like a straightforward addition, so it may be best handled by someone with more internal knowledge.

Thanks,
Tommy

--------------------------
Ken Krugler
http://www.scaleunlimited.com<http://www.scaleunlimited.com/>
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch




--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

Posted by Tommy May <tv...@gmail.com>.
Hi Ken & David,

Thanks for following up. I've responded to your questions below.

 If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)


Unfortunately the number of keys in our case is huge, they're unique per
handful of events.

If your data are already pre-partitioned and the partitioning matches (hash
> partitioning on the JAVA representation of the key yielded by the
> KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
> shuffle.


That comes with the additional constraints that Ken mentioned, correct? It
could break immediately in cases if a key comes through on a different
partition, or if the number of partitions happen to change? I'm concerned
about that for our use case as we don't have 100% control of the upstream
data source.

I feel you'd be blocked by the state access downstream (with RocksDB). Are
> you sure it isn't the case?


Yes, you are right that state access is also a limiting factor and some
optimizations to limit that have helped quite a bit (both in our
implementation and in using local SSDs for rocksdb). One other path we
looked at is using memory-backed volumes for rocksdb, but ran into a
limitation that we cannot configure Flink's process memory lower than the
k8s container memory, leading to OOMs. More details at
https://stackoverflow.com/questions/74118022/flink-pods-ooming-using-memory-backed-volume-with-k8s-operator
.

I don't have a dashboard currently to immediately point to data shuffling
as the primary bottleneck, but I thought it could be a huge optimization if
we can tell Flink to take advantage of the pre-partitioned datasource,
given we're shuffling near 1 Gb/sec right now. I can see that the join is
causing the backpressure on the sources though, and figured that network
and state acces would be the two primary contributors there. Let me know if
you have any good debugging tools to narrow in on this more.

Thanks,
Tommy


On Mon, Mar 6, 2023 at 4:42 AM David Morávek <dm...@apache.org> wrote:

> Using an operator state for a stateful join isn't great because it's meant
> to hold only a minimal state related to the operator (e.g., partition
> tracking).
>
> If your data are already pre-partitioned and the partitioning matches
> (hash partitioning on the JAVA representation of the key yielded by the
> KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
> shuffle.
>
> > What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
>
> I feel you'd be blocked by the state access downstream (with RocksDB). Are
> you sure it isn't the case?
>
> [1]
> https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector-
>
> Best,
> D.
>
> On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler <kk...@transpac.com>
> wrote:
>
>> Hi Tommy,
>>
>> To use stateful timers, you need to have a keyed stream, which gets
>> tricky when you’re trying to avoid network traffic caused by the keyBy()
>>
>> If the number of unique keys isn’t huge, I could think of yet another
>> helicopter stunt that you could try :)
>>
>> It’s possible to calculate a composite key, based on the “real” key and a
>> synthetic value, that will wind up on in the same slot where you’re doing
>> this calculation.
>>
>> So that would let you create a keyed stream which would have
>> serialization/deserialization cost, but wouldn’t actually go through the
>> network stack.
>>
>> Since the composite key generation is deterministic, you can do the same
>> thing on both streams, and join on the composite key.
>>
>> You’d want to cache the mapping from the real key to the synthetic value,
>> to avoid doing this calculation for every record.
>>
>> If that sounds promising, lmk and I can post some code.
>>
>> — Ken
>>
>>
>> On Mar 4, 2023, at 12:37 PM, Tommy May <tv...@gmail.com> wrote:
>>
>> Hello Ken,
>>
>> Thanks for the quick response! That is an interesting workaround. In our
>> case though we are using a CoProcessFunction with stateful timers. Is there
>> a similar workaround path available in that case? The one possible way I
>> could find required partitioning data in kafka in a very specific way
>> to match what Flink's keyBy is doing, and that it'd have additional
>> constraints to the method you described that would be difficult to handle
>> in a prod environment where we don't have full control over the producers &
>> input topics.
>>
>> Regarding the addition of a more flexible way to take advantage of
>> pre-partitioned sources like in FLIP-186, would you suggest I forward this
>> chain over to the dev Flink mailing list?
>>
>> Thanks,
>> Tommy
>>
>>
>>
>> On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kk...@transpac.com>
>> wrote:
>>
>>> Hi Tommy,
>>>
>>> I believe there is a way to make this work currently, but with lots of
>>> caveats and constraints.
>>>
>>> This assumes you want to avoid any network shuffle.
>>>
>>> 1. Both topics have names that return the same value for
>>> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
>>> 2. Both topics have the same number of partitions.
>>> 3. The parallelism of your join function exactly matches the number of
>>> partitions.
>>> 4. You can’t change any of the above without losing state.
>>> 5. You don’t need stateful timers.
>>>
>>> If the above is true, then you could use a CoFlatMapFunction and
>>> operator state to implement a stateful join.
>>>
>>> If it’s something like a left outer join without any state TTL or need
>>> to keep both sides in state, then it’s pretty easy.
>>>
>>> — Ken
>>>
>>> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic
>>> name to get the hashCode() result you need.
>>>
>>> On Mar 3, 2023, at 4:56 PM, Tommy May <tv...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> My team has a Flink streaming job that does a stateful join across two
>>> high throughput kafka topics. This results in a large amount of data ser/de
>>> and shuffling (about 1gb/s for context). We're running into a bottleneck on
>>> this shuffling step. We've attempted to optimize our flink configuration,
>>> join logic, scale out the kafka topics & flink job, and speed up state
>>> access. What we see is that the join step causes backpressure on the kafka
>>> sources and lag slowly starts to accumulate.
>>>
>>> One idea we had to optimize this is to pre-partition the data in kafka
>>> on the same key that the join is happening on. This'll effectively reduce
>>> data shuffling to 0 and remove the bottleneck that we're seeing. I've done
>>> some research into the topic and from what I understand this is not
>>> straightforward to take advantage of in Flink. It looks to be a fairly
>>> commonly requested feature based on the many StackOverflow posts and slack
>>> questions, and I noticed there is FLIP-186 which attempts to address this
>>> topic as well.
>>>
>>> Are there any upcoming plans to add this feature to a future Flink
>>> release? I believe it'd be super impactful for similar large scale jobs out
>>> there. I'd be interested in helping as well, but admittedly I'm relatively
>>> new to Flink.  I poked around the code a bit, and it certainly did not seem
>>> like a straightforward addition, so it may be best handled by someone with
>>> more internal knowledge.
>>>
>>> Thanks,
>>> Tommy
>>>
>>>
>>> --------------------------
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>>>
>>>
>>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>

Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

Posted by David Morávek <dm...@apache.org>.
Using an operator state for a stateful join isn't great because it's meant
to hold only a minimal state related to the operator (e.g., partition
tracking).

If your data are already pre-partitioned and the partitioning matches (hash
partitioning on the JAVA representation of the key yielded by the
KeySelector), you can use `reinterpretAsKeyedStream` [1] to skip the
shuffle.

> What we see is that the join step causes backpressure on the kafka
sources and lag slowly starts to accumulate.

I feel you'd be blocked by the state access downstream (with RocksDB). Are
you sure it isn't the case?

[1]
https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector-

Best,
D.

On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler <kk...@transpac.com>
wrote:

> Hi Tommy,
>
> To use stateful timers, you need to have a keyed stream, which gets tricky
> when you’re trying to avoid network traffic caused by the keyBy()
>
> If the number of unique keys isn’t huge, I could think of yet another
> helicopter stunt that you could try :)
>
> It’s possible to calculate a composite key, based on the “real” key and a
> synthetic value, that will wind up on in the same slot where you’re doing
> this calculation.
>
> So that would let you create a keyed stream which would have
> serialization/deserialization cost, but wouldn’t actually go through the
> network stack.
>
> Since the composite key generation is deterministic, you can do the same
> thing on both streams, and join on the composite key.
>
> You’d want to cache the mapping from the real key to the synthetic value,
> to avoid doing this calculation for every record.
>
> If that sounds promising, lmk and I can post some code.
>
> — Ken
>
>
> On Mar 4, 2023, at 12:37 PM, Tommy May <tv...@gmail.com> wrote:
>
> Hello Ken,
>
> Thanks for the quick response! That is an interesting workaround. In our
> case though we are using a CoProcessFunction with stateful timers. Is there
> a similar workaround path available in that case? The one possible way I
> could find required partitioning data in kafka in a very specific way
> to match what Flink's keyBy is doing, and that it'd have additional
> constraints to the method you described that would be difficult to handle
> in a prod environment where we don't have full control over the producers &
> input topics.
>
> Regarding the addition of a more flexible way to take advantage of
> pre-partitioned sources like in FLIP-186, would you suggest I forward this
> chain over to the dev Flink mailing list?
>
> Thanks,
> Tommy
>
>
>
> On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kk...@transpac.com>
> wrote:
>
>> Hi Tommy,
>>
>> I believe there is a way to make this work currently, but with lots of
>> caveats and constraints.
>>
>> This assumes you want to avoid any network shuffle.
>>
>> 1. Both topics have names that return the same value for
>> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
>> 2. Both topics have the same number of partitions.
>> 3. The parallelism of your join function exactly matches the number of
>> partitions.
>> 4. You can’t change any of the above without losing state.
>> 5. You don’t need stateful timers.
>>
>> If the above is true, then you could use a CoFlatMapFunction and operator
>> state to implement a stateful join.
>>
>> If it’s something like a left outer join without any state TTL or need to
>> keep both sides in state, then it’s pretty easy.
>>
>> — Ken
>>
>> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic
>> name to get the hashCode() result you need.
>>
>> On Mar 3, 2023, at 4:56 PM, Tommy May <tv...@gmail.com> wrote:
>>
>> Hello,
>>
>> My team has a Flink streaming job that does a stateful join across two
>> high throughput kafka topics. This results in a large amount of data ser/de
>> and shuffling (about 1gb/s for context). We're running into a bottleneck on
>> this shuffling step. We've attempted to optimize our flink configuration,
>> join logic, scale out the kafka topics & flink job, and speed up state
>> access. What we see is that the join step causes backpressure on the kafka
>> sources and lag slowly starts to accumulate.
>>
>> One idea we had to optimize this is to pre-partition the data in kafka on
>> the same key that the join is happening on. This'll effectively reduce data
>> shuffling to 0 and remove the bottleneck that we're seeing. I've done some
>> research into the topic and from what I understand this is not
>> straightforward to take advantage of in Flink. It looks to be a fairly
>> commonly requested feature based on the many StackOverflow posts and slack
>> questions, and I noticed there is FLIP-186 which attempts to address this
>> topic as well.
>>
>> Are there any upcoming plans to add this feature to a future Flink
>> release? I believe it'd be super impactful for similar large scale jobs out
>> there. I'd be interested in helping as well, but admittedly I'm relatively
>> new to Flink.  I poked around the code a bit, and it certainly did not seem
>> like a straightforward addition, so it may be best handled by someone with
>> more internal knowledge.
>>
>> Thanks,
>> Tommy
>>
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

Posted by Ken Krugler <kk...@transpac.com>.
Hi Tommy,

To use stateful timers, you need to have a keyed stream, which gets tricky when you’re trying to avoid network traffic caused by the keyBy()

If the number of unique keys isn’t huge, I could think of yet another helicopter stunt that you could try :)

It’s possible to calculate a composite key, based on the “real” key and a synthetic value, that will wind up on in the same slot where you’re doing this calculation.

So that would let you create a keyed stream which would have serialization/deserialization cost, but wouldn’t actually go through the network stack.

Since the composite key generation is deterministic, you can do the same thing on both streams, and join on the composite key.

You’d want to cache the mapping from the real key to the synthetic value, to avoid doing this calculation for every record.

If that sounds promising, lmk and I can post some code.

— Ken


> On Mar 4, 2023, at 12:37 PM, Tommy May <tv...@gmail.com> wrote:
> 
> Hello Ken,
> 
> Thanks for the quick response! That is an interesting workaround. In our case though we are using a CoProcessFunction with stateful timers. Is there a similar workaround path available in that case? The one possible way I could find required partitioning data in kafka in a very specific way to match what Flink's keyBy is doing, and that it'd have additional constraints to the method you described that would be difficult to handle in a prod environment where we don't have full control over the producers & input topics.
> 
> Regarding the addition of a more flexible way to take advantage of pre-partitioned sources like in FLIP-186, would you suggest I forward this chain over to the dev Flink mailing list? 
> 
> Thanks,
> Tommy
> 
>   
> 
> On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
> Hi Tommy,
> 
> I believe there is a way to make this work currently, but with lots of caveats and constraints.
> 
> This assumes you want to avoid any network shuffle.
> 
> 1. Both topics have names that return the same value for ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
> 2. Both topics have the same number of partitions.
> 3. The parallelism of your join function exactly matches the number of partitions.
> 4. You can’t change any of the above without losing state.
> 5. You don’t need stateful timers.
> 
> If the above is true, then you could use a CoFlatMapFunction and operator state to implement a stateful join.
> 
> If it’s something like a left outer join without any state TTL or need to keep both sides in state, then it’s pretty easy.
> 
> — Ken
> 
> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic name to get the hashCode() result you need.
> 
>> On Mar 3, 2023, at 4:56 PM, Tommy May <tvmay02@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hello, 
>> 
>> My team has a Flink streaming job that does a stateful join across two high throughput kafka topics. This results in a large amount of data ser/de and shuffling (about 1gb/s for context). We're running into a bottleneck on this shuffling step. We've attempted to optimize our flink configuration, join logic, scale out the kafka topics & flink job, and speed up state access. What we see is that the join step causes backpressure on the kafka sources and lag slowly starts to accumulate. 
>> 
>> One idea we had to optimize this is to pre-partition the data in kafka on the same key that the join is happening on. This'll effectively reduce data shuffling to 0 and remove the bottleneck that we're seeing. I've done some research into the topic and from what I understand this is not straightforward to take advantage of in Flink. It looks to be a fairly commonly requested feature based on the many StackOverflow posts and slack questions, and I noticed there is FLIP-186 which attempts to address this topic as well. 
>> 
>> Are there any upcoming plans to add this feature to a future Flink release? I believe it'd be super impactful for similar large scale jobs out there. I'd be interested in helping as well, but admittedly I'm relatively new to Flink.  I poked around the code a bit, and it certainly did not seem like a straightforward addition, so it may be best handled by someone with more internal knowledge.
>> 
>> Thanks,
>> Tommy
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch




Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

Posted by Tommy May <tv...@gmail.com>.
Hello Ken,

Thanks for the quick response! That is an interesting workaround. In our
case though we are using a CoProcessFunction with stateful timers. Is there
a similar workaround path available in that case? The one possible way I
could find required partitioning data in kafka in a very specific way
to match what Flink's keyBy is doing, and that it'd have additional
constraints to the method you described that would be difficult to handle
in a prod environment where we don't have full control over the producers &
input topics.

Regarding the addition of a more flexible way to take advantage of
pre-partitioned sources like in FLIP-186, would you suggest I forward this
chain over to the dev Flink mailing list?

Thanks,
Tommy



On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kk...@transpac.com>
wrote:

> Hi Tommy,
>
> I believe there is a way to make this work currently, but with lots of
> caveats and constraints.
>
> This assumes you want to avoid any network shuffle.
>
> 1. Both topics have names that return the same value for
> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
> 2. Both topics have the same number of partitions.
> 3. The parallelism of your join function exactly matches the number of
> partitions.
> 4. You can’t change any of the above without losing state.
> 5. You don’t need stateful timers.
>
> If the above is true, then you could use a CoFlatMapFunction and operator
> state to implement a stateful join.
>
> If it’s something like a left outer join without any state TTL or need to
> keep both sides in state, then it’s pretty easy.
>
> — Ken
>
> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic
> name to get the hashCode() result you need.
>
> On Mar 3, 2023, at 4:56 PM, Tommy May <tv...@gmail.com> wrote:
>
> Hello,
>
> My team has a Flink streaming job that does a stateful join across two
> high throughput kafka topics. This results in a large amount of data ser/de
> and shuffling (about 1gb/s for context). We're running into a bottleneck on
> this shuffling step. We've attempted to optimize our flink configuration,
> join logic, scale out the kafka topics & flink job, and speed up state
> access. What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
>
> One idea we had to optimize this is to pre-partition the data in kafka on
> the same key that the join is happening on. This'll effectively reduce data
> shuffling to 0 and remove the bottleneck that we're seeing. I've done some
> research into the topic and from what I understand this is not
> straightforward to take advantage of in Flink. It looks to be a fairly
> commonly requested feature based on the many StackOverflow posts and slack
> questions, and I noticed there is FLIP-186 which attempts to address this
> topic as well.
>
> Are there any upcoming plans to add this feature to a future Flink
> release? I believe it'd be super impactful for similar large scale jobs out
> there. I'd be interested in helping as well, but admittedly I'm relatively
> new to Flink.  I poked around the code a bit, and it certainly did not seem
> like a straightforward addition, so it may be best handled by someone with
> more internal knowledge.
>
> Thanks,
> Tommy
>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Re: Avoiding data shuffling when reading pre-partitioned data from Kafka

Posted by Ken Krugler <kk...@transpac.com>.
Hi Tommy,

I believe there is a way to make this work currently, but with lots of caveats and constraints.

This assumes you want to avoid any network shuffle.

1. Both topics have names that return the same value for ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
2. Both topics have the same number of partitions.
3. The parallelism of your join function exactly matches the number of partitions.
4. You can’t change any of the above without losing state.
5. You don’t need stateful timers.

If the above is true, then you could use a CoFlatMapFunction and operator state to implement a stateful join.

If it’s something like a left outer join without any state TTL or need to keep both sides in state, then it’s pretty easy.

— Ken

PS - it’s pretty easy to figure out a “-xxx” value to append to a topic name to get the hashCode() result you need.

> On Mar 3, 2023, at 4:56 PM, Tommy May <tv...@gmail.com> wrote:
> 
> Hello, 
> 
> My team has a Flink streaming job that does a stateful join across two high throughput kafka topics. This results in a large amount of data ser/de and shuffling (about 1gb/s for context). We're running into a bottleneck on this shuffling step. We've attempted to optimize our flink configuration, join logic, scale out the kafka topics & flink job, and speed up state access. What we see is that the join step causes backpressure on the kafka sources and lag slowly starts to accumulate. 
> 
> One idea we had to optimize this is to pre-partition the data in kafka on the same key that the join is happening on. This'll effectively reduce data shuffling to 0 and remove the bottleneck that we're seeing. I've done some research into the topic and from what I understand this is not straightforward to take advantage of in Flink. It looks to be a fairly commonly requested feature based on the many StackOverflow posts and slack questions, and I noticed there is FLIP-186 which attempts to address this topic as well. 
> 
> Are there any upcoming plans to add this feature to a future Flink release? I believe it'd be super impactful for similar large scale jobs out there. I'd be interested in helping as well, but admittedly I'm relatively new to Flink.  I poked around the code a bit, and it certainly did not seem like a straightforward addition, so it may be best handled by someone with more internal knowledge.
> 
> Thanks,
> Tommy

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch