You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Viacheslav Chernyshev <v....@outlook.com> on 2023/03/16 16:33:46 UTC

Re: Is it possible to preserve chaining for multi-input operators?

Hi Matthias,

Just wanted to thank you for the hints! I've successfully developed a multi-stream operator that allows doing things like this:

KeyedMultiInputStream.builder(environment, new UserDefinedFunction())
        .addKeyedStream(fooSource, fooMapper, UserDefinedFunction::processFoo)
        .addKeyedStream(barSource, barMapper, UserDefinedFunction::processBar)
        .addBroadcastStream(bazSource, UserDefinedFunction::processBaz)
        .build();

Direct connectivity to the sources and optional on-the-fly mappers have completely eliminated the performance issues that we had been facing before.

Kind regards,
Viacheslav
________________________________
From: Schwalbe Matthias <Ma...@viseca.ch>
Sent: 28 February 2023 15:50
To: Viacheslav Chernyshev <v....@outlook.com>; user@flink.apache.org <us...@flink.apache.org>
Subject: RE: Is it possible to preserve chaining for multi-input operators?


Hi Viacheslav,



Certainly I can …



There is two parts to it,

  *   setting up such MultipleInputStreamOperator, which is documented (sort of), but not quite complete
     *   I can prepare some boiler-plate, not today, but in the next days (if you are interested)
  *   Second part is about how to put all joins and other operations into a single operator implementation (well, you exactly do that 😊 ):
     *   Equi-joins on the key, you can process per Input() implementation and state kept from other inputs
     *   Windowing is restricted to a single window key type (a Namespace in Flink-speak) for your operator
        *   Windowing can be implemented manually and modelled after the official Flink windowing operators
     *   Should you absolutely need more than one windowing namespace, then you need to become creative with state primitives
  *   You mentioned also broadcast streams, that is in the end you’ll have more than 2 input streams, the keyed ones + the broadcast streams
     *   This is where MultipleInputStreamOperator comes into play, because you are not restricted to only 2 input streams as in the KeyedCoProcessFunction case
     *   That gives you more freedom to combine data in a single operator instead of being forced to split/chain multiple operators



Kind regards



Thias









From: Viacheslav Chernyshev <v....@outlook.com>
Sent: Tuesday, February 28, 2023 3:42 PM
To: user@flink.apache.org
Subject: Re: Is it possible to preserve chaining for multi-input operators?



Hi Matthias,



Thank you for the reply. You are absolutely right, the first keyBy is unavoidable, but after that we fix the parallelism and maintain the same key throughout the pipeline.



The MultipleInputStreamOperator approach that you've described looks very interesting! Unfortunately, I have never used it before. Would you be able to share the details for how to force the chaining with e.g. two input streams?



Kind regards,

Viacheslav

________________________________

From: Schwalbe Matthias <Ma...@viseca.ch>>
Sent: 28 February 2023 14:12
To: Viacheslav Chernyshev <v....@outlook.com>>; user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: RE: Is it possible to preserve chaining for multi-input operators?





Hi Viacheslav,



These are two very interesting questions…



You have found out about the chaining restriction to single input operators to be chained, it does also not help to union() multiple streams into a single input, they still count as multiple inputs.



  *   The harder way to go would be to patch the relevant parts of Flink to allow chaining with multiple inputs

     *   This is very complicated to get right, especially for the then multiple inputs and outputs that need to get façaded
     *   We once did it (successfully) and abandoned the idea because of its complexity and maintenance cost

  *   The other way might be to implement all into one org.apache.flink.streaming.api.operators.MultipleInputStreamOperator that allows to have any (reasonable) number of inputs, keyed, non-keyed, broadcast ; mixed …. Let me explain:

     *   From what you say I assume, that after the Kafka source you need to .keyBy() the instrument-id anyway, which means a shuffle and (de-/)serialization … unavoidable.
     *   However, after that shuffle, the MultipleInputStreamOperator could force-chain all your logic as long as it stays to be on the same key/partition domain
     *   Integration of broadcast inputs is a no-brainer there
     *   We do these things all the time and it really helps cutting down serialization cost, among other things
     *   This way does not necessarily help with keeping latency down, as more inputs means more time to round-robin the available inputs



I hope this helps



What do you think?



Regards



Thias











From: Viacheslav Chernyshev <v....@outlook.com>>
Sent: Tuesday, February 28, 2023 1:06 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Is it possible to preserve chaining for multi-input operators?



Hi everyone,



My team is developing a streaming pipeline for analytics on top of market data. The ultimate goal is to be able to handle tens of millions of events per second distributed across the cluster according to the unique ID of a particular financial instrument. Unfortunately, we struggle with achieving acceptable performance. As far as I can see, Flink forcibly breaks operator chaining when it encounters a job graph node with multiple inputs. Subsequently, it severely affects the performance because a network boundary is enforced, and every event is forcibly serialised and deserialised.



From the pipeline graph perspective, the requirements are:

  *   Read data from multiple Kafka topics that are connected to different nodes in the graph.
  *   Broadcast a number of dynamic rules to the pipeline.

The cleanest way is to achieve the first goal is to have a bunch of KeyedCoProcessFunction operations. This design didn't work for us because the SerDe overhead added by broken chains was too high, we had to completely flatten the pipeline instead. Unfortunately, I can't find any way to solve the second problem. As soon as the broadcast stream is introduced into the pipeline, the performance tanks.



Is there any technique that I could possibly utilise to preserve the chaining?



Kind regards,

Viacheslav

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

RE: Is it possible to preserve chaining for multi-input operators?

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Viacheslav,

… back from vacation

… you are welcome, glad to hear it worked out 😊

Thias


From: Viacheslav Chernyshev <v....@outlook.com>
Sent: Thursday, March 16, 2023 5:34 PM
To: user@flink.apache.org
Subject: Re: Is it possible to preserve chaining for multi-input operators?

Hi Matthias,

Just wanted to thank you for the hints! I've successfully developed a multi-stream operator that allows doing things like this:

KeyedMultiInputStream.builder(environment, new UserDefinedFunction())
        .addKeyedStream(fooSource, fooMapper, UserDefinedFunction::processFoo)
        .addKeyedStream(barSource, barMapper, UserDefinedFunction::processBar)
        .addBroadcastStream(bazSource, UserDefinedFunction::processBaz)
        .build();

Direct connectivity to the sources and optional on-the-fly mappers have completely eliminated the performance issues that we had been facing before.

Kind regards,
Viacheslav
________________________________
From: Schwalbe Matthias <Ma...@viseca.ch>>
Sent: 28 February 2023 15:50
To: Viacheslav Chernyshev <v....@outlook.com>>; user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: RE: Is it possible to preserve chaining for multi-input operators?


Hi Viacheslav,



Certainly I can …



There is two parts to it,

  *   setting up such MultipleInputStreamOperator, which is documented (sort of), but not quite complete

     *   I can prepare some boiler-plate, not today, but in the next days (if you are interested)

  *   Second part is about how to put all joins and other operations into a single operator implementation (well, you exactly do that 😊 ):

     *   Equi-joins on the key, you can process per Input() implementation and state kept from other inputs
     *   Windowing is restricted to a single window key type (a Namespace in Flink-speak) for your operator

        *   Windowing can be implemented manually and modelled after the official Flink windowing operators

     *   Should you absolutely need more than one windowing namespace, then you need to become creative with state primitives

  *   You mentioned also broadcast streams, that is in the end you’ll have more than 2 input streams, the keyed ones + the broadcast streams

     *   This is where MultipleInputStreamOperator comes into play, because you are not restricted to only 2 input streams as in the KeyedCoProcessFunction case
     *   That gives you more freedom to combine data in a single operator instead of being forced to split/chain multiple operators



Kind regards



Thias









From: Viacheslav Chernyshev <v....@outlook.com>>
Sent: Tuesday, February 28, 2023 3:42 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Is it possible to preserve chaining for multi-input operators?



Hi Matthias,



Thank you for the reply. You are absolutely right, the first keyBy is unavoidable, but after that we fix the parallelism and maintain the same key throughout the pipeline.



The MultipleInputStreamOperator approach that you've described looks very interesting! Unfortunately, I have never used it before. Would you be able to share the details for how to force the chaining with e.g. two input streams?



Kind regards,

Viacheslav

________________________________

From: Schwalbe Matthias <Ma...@viseca.ch>>
Sent: 28 February 2023 14:12
To: Viacheslav Chernyshev <v....@outlook.com>>; user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: RE: Is it possible to preserve chaining for multi-input operators?





Hi Viacheslav,



These are two very interesting questions…



You have found out about the chaining restriction to single input operators to be chained, it does also not help to union() multiple streams into a single input, they still count as multiple inputs.



  *   The harder way to go would be to patch the relevant parts of Flink to allow chaining with multiple inputs

     *   This is very complicated to get right, especially for the then multiple inputs and outputs that need to get façaded
     *   We once did it (successfully) and abandoned the idea because of its complexity and maintenance cost

  *   The other way might be to implement all into one org.apache.flink.streaming.api.operators.MultipleInputStreamOperator that allows to have any (reasonable) number of inputs, keyed, non-keyed, broadcast ; mixed …. Let me explain:

     *   From what you say I assume, that after the Kafka source you need to .keyBy() the instrument-id anyway, which means a shuffle and (de-/)serialization … unavoidable.
     *   However, after that shuffle, the MultipleInputStreamOperator could force-chain all your logic as long as it stays to be on the same key/partition domain
     *   Integration of broadcast inputs is a no-brainer there
     *   We do these things all the time and it really helps cutting down serialization cost, among other things
     *   This way does not necessarily help with keeping latency down, as more inputs means more time to round-robin the available inputs



I hope this helps



What do you think?



Regards



Thias











From: Viacheslav Chernyshev <v....@outlook.com>>
Sent: Tuesday, February 28, 2023 1:06 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Is it possible to preserve chaining for multi-input operators?



Hi everyone,



My team is developing a streaming pipeline for analytics on top of market data. The ultimate goal is to be able to handle tens of millions of events per second distributed across the cluster according to the unique ID of a particular financial instrument. Unfortunately, we struggle with achieving acceptable performance. As far as I can see, Flink forcibly breaks operator chaining when it encounters a job graph node with multiple inputs. Subsequently, it severely affects the performance because a network boundary is enforced, and every event is forcibly serialised and deserialised.



From the pipeline graph perspective, the requirements are:

  *   Read data from multiple Kafka topics that are connected to different nodes in the graph.
  *   Broadcast a number of dynamic rules to the pipeline.

The cleanest way is to achieve the first goal is to have a bunch of KeyedCoProcessFunction operations. This design didn't work for us because the SerDe overhead added by broken chains was too high, we had to completely flatten the pipeline instead. Unfortunately, I can't find any way to solve the second problem. As soon as the broadcast stream is introduced into the pipeline, the performance tanks.



Is there any technique that I could possibly utilise to preserve the chaining?



Kind regards,

Viacheslav

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.