You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Danny Cranmer (Jira)" <ji...@apache.org> on 2023/02/13 11:10:00 UTC
[jira] [Created] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop
Danny Cranmer created FLINK-31041:
-------------------------------------
Summary: Race condition in DefaultScheduler results in memory leak and busy loop
Key: FLINK-31041
URL: https://issues.apache.org/jira/browse/FLINK-31041
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 1.16.1, 1.15.3
Reporter: Danny Cranmer
Fix For: 1.17.0, 1.15.4, 1.16.2
h4. Context
When a job creates multiple sources that use the {{SourceCoordinator}} (FLIP-27), there is a failure race condition that results in:
* Memory leak of \{{ExecutionVertexVersion}}
* Busy loop constantly trying to restart job
* Restart strategy is not respected
This results in the Job Manager becoming unresponsive.
h4. Reproduction Steps
This can be reproduced by a job that creates multiple sources that fail in the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} trying to load a non-existent cert from the file system and throwing FNFE. Thus, here is a simple job to reproduce (BE WARNED: running this locally will lock up your IDE):
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperty("security.protocol", "SASL_SSL")
// SSL configurations
// Configure the path of truststore (CA) provided by the server
.setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
.setProperty("ssl.truststore.password", "test1234")
// Configure the path of keystore (private key) if client authentication is required
.setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
.setProperty("ssl.keystore.password", "test1234")
// SASL configurations
// Set SASL mechanism as SCRAM-SHA-256
.setProperty("sasl.mechanism", "SCRAM-SHA-256")
// Set JAAS configurations
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
.setBootstrapServers("http://localhost:3456")
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
.mapToObj(i -> env
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source " + i).uid("source-" + i)
.keyBy(s -> s.charAt(0))
.map(s -> s))
.collect(Collectors.toList());
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").uid("source")
.keyBy(s -> s.charAt(0))
.union(sources.toArray(new SingleOutputStreamOperator[] {}))
.print();
env.execute("test job"); {code}
h4. Root Cause
We can see that the {{OperatorCoordinatorHolder}} already has a [debounce mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], however the default scheduler does not, and processes many {{OperatorCoordinatorHolder}}.
h4. Fix
I have managed to fix this, I will open a PR, but would need feedback from people who understand this code better than me!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)