You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Lu (Jira)" <ji...@apache.org> on 2022/11/24 09:11:00 UTC

[jira] [Updated] (FLINK-30131) flink iterate will suspend when record is a bit large

     [ https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lu updated FLINK-30131:
-----------------------
    Attachment: image-2022-11-24-17-10-45-651.png

> flink iterate will suspend when record is a bit large
> -----------------------------------------------------
>
>                 Key: FLINK-30131
>                 URL: https://issues.apache.org/jira/browse/FLINK-30131
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.2
>            Reporter: Lu
>            Priority: Major
>         Attachments: image-2022-11-22-14-59-08-272.png, image-2022-11-24-17-10-45-651.png
>
>
>  
> {code:java}
> //代码占位符
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000);
> configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("4g"));
> configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 10000000);
> configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", 10000000);
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> List<Integer> list = new ArrayList<>(10);
> for (int i = 1; i < 10000; i++) {
>     list.add(i);
> }
> DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list);
> DataStream<byte[]> map = integerDataStreamSource.map(i -> new byte[10000000]).setParallelism(1).name("map to byte[]").shuffle();
> IterativeStream<byte[]> iterate = map.iterate();
> DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[], byte[]>() {
>     @Override
>     public void processElement(byte[] value, ProcessFunction<byte[], byte[]>.Context ctx, Collector<byte[]> out) throws Exception {
>         out.collect(value);
>     }
> }).name("multi collect");
> DataStream<byte[]> filter = map1.filter(i -> true ).setParallelism(1).name("feedback");
> iterate.closeWith(filter);
> map1.map(bytes -> bytes.length).name("map to length").print();
> env.execute(); {code}
> my code is above.
>  
> when i use iterate with big record ,  the iterate will suspend at a random place. when i saw the stack, it has a suspicious thread
> !image-2022-11-22-14-59-08-272.png|width=751,height=328!
> it seems like a network related problem. so i increse the network buffer memory and num. but it only delay the suspend point,  it will still suspend after iterate a little more times than before.
> i want to know if this is a bug or i have some error in my code or configuration.
> looking forward to your reply. thanks in advance.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)