You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dong Lin (Jira)" <ji...@apache.org> on 2022/12/29 01:31:00 UTC

[jira] [Assigned] (FLINK-30531) Reduce operator chain call stack depth

     [ https://issues.apache.org/jira/browse/FLINK-30531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dong Lin reassigned FLINK-30531:
--------------------------------

    Assignee: Dong Lin

> Reduce operator chain call stack depth
> --------------------------------------
>
>                 Key: FLINK-30531
>                 URL: https://issues.apache.org/jira/browse/FLINK-30531
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>
> Benchmark results show that Flink time to execute simple programs is more than 3X slower than Spark. For example, if we run the following program with object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, whereas it takes Spark less than 40 sec to run the same logic on the same machine.
> {code:java}
> DataStream<Long> stream = env.fromSequence(1, 1000000000L)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x).addSink(new DiscardingSink<>());
> {code}
>  
> It turns out that the operator chain overhead introduced by Flink is surprisingly high. For the above example program, Flink runtime goes through a call stack of 24 functions to produce 1 element. And each extra map(...) operation introduces 4 extra functions in the call stack.
> Here are the 24 functions in the call stack:
> {code:bash}
> StreamTask#processInput
> StreamOneInputProcessor#processInput
> StreamTaskSourceInput#emitNext
> SourceOperator#emitNext
> IteratorSourceReaderBase#pollNext
> SourceOutputWithWatermarks#collect
> AsyncDataOutputToOutput#emitRecord
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamSink#processElement
> {code}
>  
> Given the evidence described above, we find the following explanations for why Flink is slow for programs with low computation overhead:
>  * Operator chain currently uses pull-based loop, which has worse branch prediction than push-based loop.
>  * Java's maximum inline level is less than 18 [2]. It is easy for operator chain call stack to exceeds this limit and prevent Java from inlining function calls, which further increases the function call overhead.
>  * For function calls that are not inlined, it requires looking up a virtual table since most functions are virtual functions.
>  
> [1] [https://arxiv.org/pdf/1610.09166.pdf]
> [2] [https://bugs.openjdk.org/browse/JDK-8234863]
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)