You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yue Shang (Jira)" <ji...@apache.org> on 2022/07/15 06:21:00 UTC

[jira] [Created] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction

Yue Shang created FLINK-28562:
---------------------------------

             Summary: Rocksdb state backend is too slow when using AggregateFunction
                 Key: FLINK-28562
                 URL: https://issues.apache.org/jira/browse/FLINK-28562
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.14.3, 1.13.2
         Environment: {code:java}
final ParameterTool params = ParameterTool.fromArgs(args);

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data
DataStream<UserTag> source = env.addSource(new SourceFunction<UserTag>() {
    @Override
    public void run(SourceContext ctx) throws Exception {
        Random rd = new Random();
        while (true){
            UserTag userTag = new UserTag();
            userTag.setUserId(rd.nextLong());
            userTag.setMetricName(UUID.randomUUID().toString());
            userTag.setTagDimension(UUID.randomUUID().toString());
            userTag.setTagValue(rd.nextDouble());
            userTag.setTagTime( new Long(new Date().getTime()).intValue());
            userTag.setMonitorTime(new Long(new Date().getTime()).intValue());
            userTag.setAggregationPeriod("5s");
            userTag.setAggregationType("sum");
            userTag.setTimePeriod("hour");
            userTag.setDataType("number");
            userTag.setBaseTime(1657803600);
            userTag.setTopic(UUID.randomUUID().toString());
            for(int i = 0;i<100;i++){
                userTag.setUserTagName(UUID.randomUUID() + "-"+i);
                ctx.collect(userTag);
            }
            Thread.sleep(1);
        }
    }
    @Override
    public void cancel() {
    }
});

source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .aggregate(new AggregateFunction<UserTag, Map<String,UserTag>, List<UserTag>>(){
            @Override
            public Map<String, UserTag> createAccumulator() {
                return new HashMap<>();
            }

            @Override
            public Map<String, UserTag> add(UserTag userTag, Map<String, UserTag> stringUserTagMap) {
                stringUserTagMap.put(userTag.getUserTagName(),userTag);
                return stringUserTagMap;
            }

            @Override
            public List<UserTag> getResult(Map<String, UserTag> stringUserTagMap) {
                return new ArrayList<>(stringUserTagMap.values());
            }

            @Override
            public Map<String, UserTag> merge(Map<String, UserTag> acc1, Map<String, UserTag> acc2) {
                acc1.putAll(acc2);
                return acc1;
            }
        })
        .setParallelism(1)
        .name("NewUserTagAggregation_5s")
        .print().setParallelism(2);

// execute program
env.execute(); {code}
!image-2022-07-15-14-19-21-200.png!

!image-2022-07-15-14-19-41-678.png!
            Reporter: Yue Shang
         Attachments: image-2022-07-15-14-19-21-200.png, image-2022-07-15-14-19-41-678.png

Rocksdb state backend is too slow when using AggregateFunction.

just only supports 300 traffic per second use Map<String,Object>.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)