You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yue Shang (Jira)" <ji...@apache.org> on 2022/07/15 06:21:00 UTC
[jira] [Created] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction
Yue Shang created FLINK-28562:
---------------------------------
Summary: Rocksdb state backend is too slow when using AggregateFunction
Key: FLINK-28562
URL: https://issues.apache.org/jira/browse/FLINK-28562
Project: Flink
Issue Type: Bug
Affects Versions: 1.14.3, 1.13.2
Environment: {code:java}
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStream<UserTag> source = env.addSource(new SourceFunction<UserTag>() {
@Override
public void run(SourceContext ctx) throws Exception {
Random rd = new Random();
while (true){
UserTag userTag = new UserTag();
userTag.setUserId(rd.nextLong());
userTag.setMetricName(UUID.randomUUID().toString());
userTag.setTagDimension(UUID.randomUUID().toString());
userTag.setTagValue(rd.nextDouble());
userTag.setTagTime( new Long(new Date().getTime()).intValue());
userTag.setMonitorTime(new Long(new Date().getTime()).intValue());
userTag.setAggregationPeriod("5s");
userTag.setAggregationType("sum");
userTag.setTimePeriod("hour");
userTag.setDataType("number");
userTag.setBaseTime(1657803600);
userTag.setTopic(UUID.randomUUID().toString());
for(int i = 0;i<100;i++){
userTag.setUserTagName(UUID.randomUUID() + "-"+i);
ctx.collect(userTag);
}
Thread.sleep(1);
}
}
@Override
public void cancel() {
}
});
source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<UserTag, Map<String,UserTag>, List<UserTag>>(){
@Override
public Map<String, UserTag> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<String, UserTag> add(UserTag userTag, Map<String, UserTag> stringUserTagMap) {
stringUserTagMap.put(userTag.getUserTagName(),userTag);
return stringUserTagMap;
}
@Override
public List<UserTag> getResult(Map<String, UserTag> stringUserTagMap) {
return new ArrayList<>(stringUserTagMap.values());
}
@Override
public Map<String, UserTag> merge(Map<String, UserTag> acc1, Map<String, UserTag> acc2) {
acc1.putAll(acc2);
return acc1;
}
})
.setParallelism(1)
.name("NewUserTagAggregation_5s")
.print().setParallelism(2);
// execute program
env.execute(); {code}
!image-2022-07-15-14-19-21-200.png!
!image-2022-07-15-14-19-41-678.png!
Reporter: Yue Shang
Attachments: image-2022-07-15-14-19-21-200.png, image-2022-07-15-14-19-41-678.png
Rocksdb state backend is too slow when using AggregateFunction.
just only supports 300 traffic per second use Map<String,Object>.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)