You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Lu (Jira)" <ji...@apache.org> on 2022/11/24 09:11:00 UTC
[jira] [Updated] (FLINK-30131) flink iterate will suspend when record is a bit large
[ https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lu updated FLINK-30131:
-----------------------
Attachment: image-2022-11-24-17-10-45-651.png
> flink iterate will suspend when record is a bit large
> -----------------------------------------------------
>
> Key: FLINK-30131
> URL: https://issues.apache.org/jira/browse/FLINK-30131
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.15.2
> Reporter: Lu
> Priority: Major
> Attachments: image-2022-11-22-14-59-08-272.png, image-2022-11-24-17-10-45-651.png
>
>
>
> {code:java}
> //代码占位符
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000);
> configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("4g"));
> configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 10000000);
> configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", 10000000);
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> List<Integer> list = new ArrayList<>(10);
> for (int i = 1; i < 10000; i++) {
> list.add(i);
> }
> DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list);
> DataStream<byte[]> map = integerDataStreamSource.map(i -> new byte[10000000]).setParallelism(1).name("map to byte[]").shuffle();
> IterativeStream<byte[]> iterate = map.iterate();
> DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[], byte[]>() {
> @Override
> public void processElement(byte[] value, ProcessFunction<byte[], byte[]>.Context ctx, Collector<byte[]> out) throws Exception {
> out.collect(value);
> }
> }).name("multi collect");
> DataStream<byte[]> filter = map1.filter(i -> true ).setParallelism(1).name("feedback");
> iterate.closeWith(filter);
> map1.map(bytes -> bytes.length).name("map to length").print();
> env.execute(); {code}
> my code is above.
>
> when i use iterate with big record , the iterate will suspend at a random place. when i saw the stack, it has a suspicious thread
> !image-2022-11-22-14-59-08-272.png|width=751,height=328!
> it seems like a network related problem. so i increse the network buffer memory and num. but it only delay the suspend point, it will still suspend after iterate a little more times than before.
> i want to know if this is a bug or i have some error in my code or configuration.
> looking forward to your reply. thanks in advance.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)