You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yue Shang (Jira)" <ji...@apache.org> on 2022/07/18 12:38:00 UTC

[jira] [Reopened] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction

     [ https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yue Shang reopened FLINK-28562:
-------------------------------

> Rocksdb state backend is too slow when using AggregateFunction
> --------------------------------------------------------------
>
>                 Key: FLINK-28562
>                 URL: https://issues.apache.org/jira/browse/FLINK-28562
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.13.2, 1.14.3
>         Environment: {code:java}
> final ParameterTool params = ParameterTool.fromArgs(args);
> // set up the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data
> DataStream<UserTag> source = env.addSource(new SourceFunction<UserTag>() {
>     @Override
>     public void run(SourceContext ctx) throws Exception {
>         Random rd = new Random();
>         while (true){
>             UserTag userTag = new UserTag();
>             userTag.setUserId(rd.nextLong());
>             userTag.setMetricName(UUID.randomUUID().toString());
>             userTag.setTagDimension(UUID.randomUUID().toString());
>             userTag.setTagValue(rd.nextDouble());
>             userTag.setTagTime( new Long(new Date().getTime()).intValue());
>             userTag.setMonitorTime(new Long(new Date().getTime()).intValue());
>             userTag.setAggregationPeriod("5s");
>             userTag.setAggregationType("sum");
>             userTag.setTimePeriod("hour");
>             userTag.setDataType("number");
>             userTag.setBaseTime(1657803600);
>             userTag.setTopic(UUID.randomUUID().toString());
>             for(int i = 0;i<100;i++){
>                 userTag.setUserTagName(UUID.randomUUID() + "-"+i);
>                 ctx.collect(userTag);
>             }
>             Thread.sleep(1);
>         }
>     }
>     @Override
>     public void cancel() {
>     }
> });
> source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>         .aggregate(new AggregateFunction<UserTag, Map<String,UserTag>, List<UserTag>>(){
>             @Override
>             public Map<String, UserTag> createAccumulator() {
>                 return new HashMap<>();
>             }
>             @Override
>             public Map<String, UserTag> add(UserTag userTag, Map<String, UserTag> stringUserTagMap) {
>                 stringUserTagMap.put(userTag.getUserTagName(),userTag);
>                 return stringUserTagMap;
>             }
>             @Override
>             public List<UserTag> getResult(Map<String, UserTag> stringUserTagMap) {
>                 return new ArrayList<>(stringUserTagMap.values());
>             }
>             @Override
>             public Map<String, UserTag> merge(Map<String, UserTag> acc1, Map<String, UserTag> acc2) {
>                 acc1.putAll(acc2);
>                 return acc1;
>             }
>         })
>         .setParallelism(1)
>         .name("NewUserTagAggregation_5s")
>         .print().setParallelism(2);
> // execute program
> env.execute(); {code}
> !image-2022-07-15-14-19-21-200.png!
> !image-2022-07-15-14-19-41-678.png!
>            Reporter: Yue Shang
>            Priority: Major
>         Attachments: image-2022-07-15-14-19-21-200.png, image-2022-07-15-14-19-41-678.png, image-2022-07-18-19-56-45-650.png, image-2022-07-18-19-59-18-872.png, image-2022-07-18-20-34-54-407.png, image-2022-07-18-20-36-23-021.png
>
>
> Rocksdb state backend is too slow when using AggregateFunction.
> just only supports 300 traffic per second use Map<String,Object>.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)