You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Alexander Smirnov (Jira)" <ji...@apache.org> on 2022/10/27 08:38:00 UTC

[jira] [Comment Edited] (FLINK-19822) Remove redundant shuffle for streaming

    [ https://issues.apache.org/jira/browse/FLINK-19822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624953#comment-17624953 ] 

Alexander Smirnov edited comment on FLINK-19822 at 10/27/22 8:37 AM:
---------------------------------------------------------------------

Hi [~godfreyhe]! I was wondering why such important improvement is still open and decided to test your realization of optimization from PR. I found that in case when we also support method 'satisfyTraits' by StreamPhysicalCalc, we get unwanted side effects related to moving Exchanges together with removing redundant ones (as I understand, Volcano planner decides where to put Exchange by his own). Because of this we get several negative side effects: 1. Filters can be located after shuffle (previously filters were before shuffle); 2. Broken Local-Global optimization in Aggregate; 3. Some operators work incorrectly when they are chained with Calc (WindowOperator, as an example). Maybe it was a reason why this improvement was unsolved (in your commit I found a test checking that Calc doesn't push distribution through itself). Am I right? Anyway I decided to find a different approach to make this optimization. I came up with an idea to make specific rule _StreamRemoveRedundantExchangeRule,_ that traverses RelNode graph from sources to sinks (previously it was in reverse order) and remove duplicated Exchanges. Information about input distribution is passed through new trait {_}InputRelDistributionTrait{_}. To support optimization stream RelNodes need to implement new method _satisfyTraitsFromImputs_ (similar to _satisfyTraits_ method). Can you look at my realization and provide some feedback, please? - [https://github.com/apache/flink/pull/21170.] I reused some code from your PR, hope you don't mind.


was (Author: JIRAUSER288574):
Hi [~godfreyhe]! I was wondering why such important improvement is still open and decided to test your realization of optimization from PR. I found that in case when we also support method 'satisfyTraits' by StreamPhysicalCalc, we get unwanted side effects related to moving Exchanges together with removing redundant ones (as I understand, Volcano planner decides where to put Exchange by his own). Because of this I got several negative side effects: 1. Filters could be located after shuffle (previously filters were before shuffle); 2. Broken Local-Global optimization in Aggregate; 3. Some operators work incorrectly when they are chained with Calc (WindowOperator, as an example). Maybe it was a reason why this improvement was unsolved (in your commit I found a test checking that Calc doesn't push distribution through itself). Am I right? Anyway I decided to find a different approach to make this optimization. I came up with an idea to make specific rule _StreamRemoveRedundantExchangeRule,_ that traverses RelNode graph from sources to sinks (previously it was in reverse order) and remove duplicated Exchanges. Information about input distribution is passed through new trait {_}InputRelDistributionTrait{_}. To support optimization stream RelNodes need to implement new method _satisfyTraitsFromImputs_ (similar to _satisfyTraits_ method). Can you look at my realization and provide some feedback, please? - [https://github.com/apache/flink/pull/21170.] I reused some code from your PR, hope you don't mind.

> Remove redundant shuffle for streaming
> --------------------------------------
>
>                 Key: FLINK-19822
>                 URL: https://issues.apache.org/jira/browse/FLINK-19822
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, auto-unassigned, pull-request-available
>
> This is similar [FLINK-12575|https://issues.apache.org/jira/browse/FLINK-12575], we could implement {{satisfyTraits}} method for stream nodes to remove redundant shuffle. This could add more possibilities that more operators can be merged into multiple input operator.
> Different batch, stream operators require the shuffle keys and the state keys must be exactly the same, otherwise the state may be not correct.
> We only support a few operators in this issue, such as Join and regular Aggregate. Other operators will be supported in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)