You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Lu (Jira)" <ji...@apache.org> on 2022/11/22 07:08:00 UTC

[jira] [Created] (FLINK-30131) flink iterate will suspend when record is a bit large

Lu created FLINK-30131:
--------------------------

             Summary: flink iterate will suspend when record is a bit large
                 Key: FLINK-30131
                 URL: https://issues.apache.org/jira/browse/FLINK-30131
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.15.2
            Reporter: Lu
         Attachments: image-2022-11-22-14-59-08-272.png

 
{code:java}
//代码占位符
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8082);
configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000);
configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("4g"));
configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 10000000);
configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", 10000000);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
List<Integer> list = new ArrayList<>(10);
for (int i = 1; i < 10000; i++) {
    list.add(i);
}
DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list);
DataStream<byte[]> map = integerDataStreamSource.map(i -> new byte[10000000]).setParallelism(1).name("map to byte[]").shuffle();
IterativeStream<byte[]> iterate = map.iterate();
DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[], byte[]>() {
    @Override
    public void processElement(byte[] value, ProcessFunction<byte[], byte[]>.Context ctx, Collector<byte[]> out) throws Exception {
        out.collect(value);
    }
}).name("multi collect");
DataStream<byte[]> filter = map1.filter(i -> true ).setParallelism(1).name("feedback");
iterate.closeWith(filter);
map1.map(bytes -> bytes.length).name("map to length").print();
env.execute(); {code}
my code is above.

 

when i use iterate with big record ,  the iterate will suspend at a random place. when i saw the stack, it has a suspicious thread

!image-2022-11-22-14-59-08-272.png|width=751,height=328!

it seems like a network related problem. so i increse the network buffer memory and num. but it only delay the suspend point,  it will still suspend after iterate a little more times than before.

i want to know if this is a bug or i have some error in my code or configuration.

looking forward to your reply. thanks in advance.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)