You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "swy (JIRA)" <ji...@apache.org> on 2018/06/03 12:48:00 UTC

[jira] [Issue Comment Deleted] (FLINK-9442) Flink Scaling not working

     [ https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

swy updated FLINK-9442:
-----------------------
    Comment: was deleted

(was: ok will post to mailing list as well. thanks.
by the way once we change to RichParallelSourceFunction it works much better now.

But our real application which using FlinkKafkaConsumer011 already using RichParallelSourceFunction still not able to scale. Any tips to share?)

> Flink Scaling not working
> -------------------------
>
>                 Key: FLINK-9442
>                 URL: https://issues.apache.org/jira/browse/FLINK-9442
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.4.2
>            Reporter: swy
>            Priority: Major
>
> Hi,
>  
> We are in the middle of testing scaling ability of Flink. But we found that scaling not working, no matter increase more slot or increase number of Task Manager. We would expect a linear, if not close-to-linear scaling performance but the result even show degradation. Appreciated any comments.
>  
> Test Details,
>  
> -VMWare vsphere
> -Just a simple pass through test,
>     - auto gen source 3mil records, each 1kb in size, parallelism=1
>     - source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6
>  - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
>  - Result:
>       - 2 slots: 26 seconds, 3mil/26=115k TPS
>       - 4 slots: 23 seconds, 3mil/23=130k TPS
>       - 6 slots: 22 seconds, 3mil/22=136k TPS
>  
> As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? Thanks.
>  
>  
>  
>      public class passthru extends RichMapFunction<String, String> {
>         public void open(Configuration configuration) throws Exception {
>         ... ... 
>             stats = new NonBlockingStatsDClient();
>         }
>         public String map(String value) throws Exception { 
>             ... ...
>             stats.increment(); 
>             return value;
>         }
>     }
>     public class datagen extends RichSourceFunction<String> {
>         ... ...
>         public void run(SourceContext<String> ctx) throws Exception {
>             int i = 0;
>             while (run){
>                 String idx = String.format("%09d", i);
>                 ctx.collect("{\"<a 1kb json content with idx in certain json field>\"}");
>                 i++;
>                 if(i == loop) 
>                     run = false;
>             }
>         }
>         ... ...
>     }
>     public class Job {
>         public static void main(String[] args) throws Exception {
>         ... ...
>             DataStream<String> stream = env.addSource(new datagen(loop)).rebalance();
>             DataStream<String> convert = stream.map(new passthru(statsdUrl));
>             env.execute("Flink");
>         } 
>     }
> The reason of this sample test is because of Kafka source FlinkKafkaConsumer011 facing the same issue which is not scale-able. And FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always set kafka partition = total TM #slot. But the result is still capped and not improve linearly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)