You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yue Shang (Jira)" <ji...@apache.org> on 2022/07/18 12:37:00 UTC

[jira] [Commented] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction

    [ https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567969#comment-17567969 ] 

Yue Shang commented on FLINK-28562:
-----------------------------------

sorry,i forgot my env opts,

-Dstate.backend.incremental=true
-Drest.flamegraph.enabled=true
-Dstate.backend=rocksdb

 

just  0.numRecordsInPerSecond will be 170/s while use rocksdb,but 0.numRecordsInPerSecond is 70000/s while state.backend change to hashmap.

 

And  i had register kryo type:
{code:java}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerKryoType(Map.class);
env.getConfig().registerKryoType(UserTag.class);
env.getConfig().registerKryoType(List.class);{code}
but also not work,it just 300/s.Flame graph  is very terrible.

!image-2022-07-18-20-34-54-407.png!

!image-2022-07-18-20-36-23-021.png!

ALL  application mode jobManager configuration:
$internal.application.main WindowAggTest
$internal.application.program-args --nub;10000
$internal.deployment.config-dir /data/services/-202207062015-master-7cf96f44/bin/bylink/flink/flink-env/flink-1.14.3/conf
$internal.yarn.log-config-file /data/services/-202207062015-master-7cf96f44/bin/bylink/flink/flink-env/flink-1.14.3/conf/log4j.properties
classloader.resolve-order parent-first
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
execution.checkpointing.interval 600s
execution.checkpointing.mode EXACTLY_ONCE
execution.checkpointing.timeout 60s
execution.checkpointing.tolerable-failed-checkpoints 3
execution.target embedded
high-availability.cluster-id application_*******_****
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs /data/storage/yarn/local/usercache/*****/appcache/application_*******_****
jobmanager.archive.fs.dir hdfs:///****/flink/completed-jobs/
jobmanager.memory.heap.size 469762048b
jobmanager.memory.jvm-metaspace.size 268435456b
jobmanager.memory.jvm-overhead.max 201326592b
jobmanager.memory.jvm-overhead.min 201326592b
jobmanager.memory.off-heap.size 134217728b
jobmanager.memory.process.size 1024m
jobmanager.rpc.address ****
jobmanager.rpc.port ****
parallelism.default 1
pipeline.classpaths
pipeline.jars file:/********/container_****/WindowsAggTest-1.0-SNAPSHOT.jar
pipeline.name 1757_Admin_aggTest
rest.address **********
rest.flamegraph.enabled true
state.backend rocksdb
state.backend.incremental true
state.checkpoint-storage filesystem
state.checkpoints.dir hdfs:///******/flink/checkpoints/1757
state.checkpoints.num-retained 10
state.savepoints.dir hdfs:///******/flink/checkpoints/1757
taskmanager.memory.process.size 8192m
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-********8
yarn.application.name 1757_Admin_aggTest
yarn.application.queue ************
yarn.application.type Apache Flink 1.14.3
yarn.flink-dist-jar hdfs:///*******/flink/flink-env/flink-1.14.3/lib/flink-dist_2.12-1.14.3.jar
yarn.provided.lib.dirs hdfs:///*********/flink/flink-env/flink-1.14.3/lib
 

> Rocksdb state backend is too slow when using AggregateFunction
> --------------------------------------------------------------
>
>                 Key: FLINK-28562
>                 URL: https://issues.apache.org/jira/browse/FLINK-28562
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.13.2, 1.14.3
>         Environment: {code:java}
> final ParameterTool params = ParameterTool.fromArgs(args);
> // set up the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data
> DataStream<UserTag> source = env.addSource(new SourceFunction<UserTag>() {
>     @Override
>     public void run(SourceContext ctx) throws Exception {
>         Random rd = new Random();
>         while (true){
>             UserTag userTag = new UserTag();
>             userTag.setUserId(rd.nextLong());
>             userTag.setMetricName(UUID.randomUUID().toString());
>             userTag.setTagDimension(UUID.randomUUID().toString());
>             userTag.setTagValue(rd.nextDouble());
>             userTag.setTagTime( new Long(new Date().getTime()).intValue());
>             userTag.setMonitorTime(new Long(new Date().getTime()).intValue());
>             userTag.setAggregationPeriod("5s");
>             userTag.setAggregationType("sum");
>             userTag.setTimePeriod("hour");
>             userTag.setDataType("number");
>             userTag.setBaseTime(1657803600);
>             userTag.setTopic(UUID.randomUUID().toString());
>             for(int i = 0;i<100;i++){
>                 userTag.setUserTagName(UUID.randomUUID() + "-"+i);
>                 ctx.collect(userTag);
>             }
>             Thread.sleep(1);
>         }
>     }
>     @Override
>     public void cancel() {
>     }
> });
> source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>         .aggregate(new AggregateFunction<UserTag, Map<String,UserTag>, List<UserTag>>(){
>             @Override
>             public Map<String, UserTag> createAccumulator() {
>                 return new HashMap<>();
>             }
>             @Override
>             public Map<String, UserTag> add(UserTag userTag, Map<String, UserTag> stringUserTagMap) {
>                 stringUserTagMap.put(userTag.getUserTagName(),userTag);
>                 return stringUserTagMap;
>             }
>             @Override
>             public List<UserTag> getResult(Map<String, UserTag> stringUserTagMap) {
>                 return new ArrayList<>(stringUserTagMap.values());
>             }
>             @Override
>             public Map<String, UserTag> merge(Map<String, UserTag> acc1, Map<String, UserTag> acc2) {
>                 acc1.putAll(acc2);
>                 return acc1;
>             }
>         })
>         .setParallelism(1)
>         .name("NewUserTagAggregation_5s")
>         .print().setParallelism(2);
> // execute program
> env.execute(); {code}
> !image-2022-07-15-14-19-21-200.png!
> !image-2022-07-15-14-19-41-678.png!
>            Reporter: Yue Shang
>            Priority: Major
>         Attachments: image-2022-07-15-14-19-21-200.png, image-2022-07-15-14-19-41-678.png, image-2022-07-18-19-56-45-650.png, image-2022-07-18-19-59-18-872.png, image-2022-07-18-20-34-54-407.png, image-2022-07-18-20-36-23-021.png
>
>
> Rocksdb state backend is too slow when using AggregateFunction.
> just only supports 300 traffic per second use Map<String,Object>.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)