You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yue Shang (Jira)" <ji...@apache.org> on 2022/07/18 11:57:00 UTC
[jira] [Updated] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yue Shang updated FLINK-28562:
------------------------------
Attachment: image-2022-07-18-19-56-45-650.png
> Rocksdb state backend is too slow when using AggregateFunction
> --------------------------------------------------------------
>
> Key: FLINK-28562
> URL: https://issues.apache.org/jira/browse/FLINK-28562
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.13.2, 1.14.3
> Environment: {code:java}
> final ParameterTool params = ParameterTool.fromArgs(args);
> // set up the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data
> DataStream<UserTag> source = env.addSource(new SourceFunction<UserTag>() {
> @Override
> public void run(SourceContext ctx) throws Exception {
> Random rd = new Random();
> while (true){
> UserTag userTag = new UserTag();
> userTag.setUserId(rd.nextLong());
> userTag.setMetricName(UUID.randomUUID().toString());
> userTag.setTagDimension(UUID.randomUUID().toString());
> userTag.setTagValue(rd.nextDouble());
> userTag.setTagTime( new Long(new Date().getTime()).intValue());
> userTag.setMonitorTime(new Long(new Date().getTime()).intValue());
> userTag.setAggregationPeriod("5s");
> userTag.setAggregationType("sum");
> userTag.setTimePeriod("hour");
> userTag.setDataType("number");
> userTag.setBaseTime(1657803600);
> userTag.setTopic(UUID.randomUUID().toString());
> for(int i = 0;i<100;i++){
> userTag.setUserTagName(UUID.randomUUID() + "-"+i);
> ctx.collect(userTag);
> }
> Thread.sleep(1);
> }
> }
> @Override
> public void cancel() {
> }
> });
> source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
> .aggregate(new AggregateFunction<UserTag, Map<String,UserTag>, List<UserTag>>(){
> @Override
> public Map<String, UserTag> createAccumulator() {
> return new HashMap<>();
> }
> @Override
> public Map<String, UserTag> add(UserTag userTag, Map<String, UserTag> stringUserTagMap) {
> stringUserTagMap.put(userTag.getUserTagName(),userTag);
> return stringUserTagMap;
> }
> @Override
> public List<UserTag> getResult(Map<String, UserTag> stringUserTagMap) {
> return new ArrayList<>(stringUserTagMap.values());
> }
> @Override
> public Map<String, UserTag> merge(Map<String, UserTag> acc1, Map<String, UserTag> acc2) {
> acc1.putAll(acc2);
> return acc1;
> }
> })
> .setParallelism(1)
> .name("NewUserTagAggregation_5s")
> .print().setParallelism(2);
> // execute program
> env.execute(); {code}
> !image-2022-07-15-14-19-21-200.png!
> !image-2022-07-15-14-19-41-678.png!
> Reporter: Yue Shang
> Priority: Major
> Attachments: image-2022-07-15-14-19-21-200.png, image-2022-07-15-14-19-41-678.png, image-2022-07-18-19-56-45-650.png
>
>
> Rocksdb state backend is too slow when using AggregateFunction.
> just only supports 300 traffic per second use Map<String,Object>.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)