You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Weijie Guo (Jira)" <ji...@apache.org> on 2022/11/22 12:53:00 UTC

[jira] [Comment Edited] (FLINK-30131) flink iterate will suspend when record is a bit large

    [ https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637165#comment-17637165 ] 

Weijie Guo edited comment on FLINK-30131 at 11/22/22 12:52 PM:
---------------------------------------------------------------

[~landlord] Thanks for reporting this.

At present, I have not investigated this issue in depth, but only made further understanding:
1. It can be seen that the thread is stuck in the `requestMemorySegmentBlocking` when the data shuffle is written. Theoretically, this will not be stuck all the time, and will be reused with downstream consumption. So, Is your job hanging forever?
2. What environment did you run this job, local or YARN/K8S, which helps to reproduce the problem. And can you also provide your flink-conf.yaml?
3. If you need to increase the number of network buffers, you only need to increase the total TM memory and network memory, adjusting `taskmanager.network.memory.buffers-per-channel` and `taskmanager.network.memory.floating-buffers-per-gate` will not help solve this problem, but will also make the buffer request more competitive.

TBH, I am not familiar with the implementation of iteration, cc [~gaoyunhaii]  for more professional advice.

 


was (Author: weijie guo):
[~landlord] Thanks for reporting this.

At present, I have not investigated this issue in depth, but only made further understanding:
1. It can be seen that the thread is stuck in the `requestMemorySegmentBlocking` when the data shuffle is written. Theoretically, this will not be stuck all the time, and will be reused with downstream consumption. So, Is your job hanging forever?
2. What environment did you run this job, local or YARN/K8S, which helps to reproduce the problem. And can you also provide your flink-conf.yaml?
3. If you need to increase the number of network buffers, you only need to increase the total TM memory and network memory, adjusting `taskmanager.network.memory.buffers-per-channel` and `taskmanager.network.memory.floating-buffers-per-gate` will not help solve this problem, but will also make the buffer request more competitive.

> flink iterate will suspend when record is a bit large
> -----------------------------------------------------
>
>                 Key: FLINK-30131
>                 URL: https://issues.apache.org/jira/browse/FLINK-30131
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.2
>            Reporter: Lu
>            Priority: Major
>         Attachments: image-2022-11-22-14-59-08-272.png
>
>
>  
> {code:java}
> //代码占位符
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000);
> configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("4g"));
> configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 10000000);
> configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", 10000000);
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> List<Integer> list = new ArrayList<>(10);
> for (int i = 1; i < 10000; i++) {
>     list.add(i);
> }
> DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list);
> DataStream<byte[]> map = integerDataStreamSource.map(i -> new byte[10000000]).setParallelism(1).name("map to byte[]").shuffle();
> IterativeStream<byte[]> iterate = map.iterate();
> DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[], byte[]>() {
>     @Override
>     public void processElement(byte[] value, ProcessFunction<byte[], byte[]>.Context ctx, Collector<byte[]> out) throws Exception {
>         out.collect(value);
>     }
> }).name("multi collect");
> DataStream<byte[]> filter = map1.filter(i -> true ).setParallelism(1).name("feedback");
> iterate.closeWith(filter);
> map1.map(bytes -> bytes.length).name("map to length").print();
> env.execute(); {code}
> my code is above.
>  
> when i use iterate with big record ,  the iterate will suspend at a random place. when i saw the stack, it has a suspicious thread
> !image-2022-11-22-14-59-08-272.png|width=751,height=328!
> it seems like a network related problem. so i increse the network buffer memory and num. but it only delay the suspend point,  it will still suspend after iterate a little more times than before.
> i want to know if this is a bug or i have some error in my code or configuration.
> looking forward to your reply. thanks in advance.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)