You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "swy (JIRA)" <ji...@apache.org> on 2018/06/03 15:04:00 UTC

[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

    [ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499445#comment-16499445 ] 

swy edited comment on FLINK-9506 at 6/3/18 3:03 PM:
----------------------------------------------------

what we want to know is: Is this something expected(which is drop >100%) or something wrong in our code? Even we understand that the 'state' will impact performance, but not so much and also in a fluctuation pattern.


was (Author: yow):
what we want to know is: Is this something expected or something wrong in our code?

> Flink ReducingState.add causing more than 100% performance drop
> ---------------------------------------------------------------
>
>                 Key: FLINK-9506
>                 URL: https://issues.apache.org/jira/browse/FLINK-9506
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.4.2
>            Reporter: swy
>            Priority: Major
>         Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when ReducingState.add is used in the source code. In the test checkpoint is disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just simply keep storing record, also with simple reduction function(in fact with empty function would see the same result). Any idea would be appreciated. What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how many record per second in "JsonTranslator", which is shown in the graph. The difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream<String> stream = env.addSource(new GeneratorSource(loop);
> DataStream<JSONObject> convert = stream.map(new JsonTranslator())
>                                        .keyBy()
>                                        .process(new ProcessAggregation())
>                                        .map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
>     private ReducingState<Record> recStore;
>     public void processElement(Recordr, Context ctx, Collector<Record> out) {
>         recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)