You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Danny Cranmer (Jira)" <ji...@apache.org> on 2023/02/13 11:10:00 UTC

[jira] [Created] (FLINK-31041) Race condition in DefaultScheduler results in memory leak and busy loop

Danny Cranmer created FLINK-31041:
-------------------------------------

             Summary: Race condition in DefaultScheduler results in memory leak and busy loop
                 Key: FLINK-31041
                 URL: https://issues.apache.org/jira/browse/FLINK-31041
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.16.1, 1.15.3
            Reporter: Danny Cranmer
             Fix For: 1.17.0, 1.15.4, 1.16.2


h4. Context

When a job creates multiple sources that use the {{SourceCoordinator}} (FLIP-27), there is a failure race condition that results in:
 * Memory leak of \{{ExecutionVertexVersion}}
 * Busy loop constantly trying to restart job
 * Restart strategy is not respected

This results in the Job Manager becoming unresponsive.
h4. Reproduction Steps

This can be reproduced by a job that creates multiple sources that fail in the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} trying to load a non-existent cert from the file system and throwing FNFE. Thus, here is a simple job to reproduce (BE WARNED: running this locally will lock up your IDE):
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));

KafkaSource<String> source = KafkaSource.<String>builder()
        .setProperty("security.protocol", "SASL_SSL")
        // SSL configurations
        // Configure the path of truststore (CA) provided by the server
        .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
        .setProperty("ssl.truststore.password", "test1234")
        // Configure the path of keystore (private key) if client authentication is required
        .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
        .setProperty("ssl.keystore.password", "test1234")
        // SASL configurations
        // Set SASL mechanism as SCRAM-SHA-256
        .setProperty("sasl.mechanism", "SCRAM-SHA-256")
        // Set JAAS configurations
        .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
        .setBootstrapServers("http://localhost:3456")
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();

List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
        .mapToObj(i -> env
                .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source " + i).uid("source-" + i)
                .keyBy(s -> s.charAt(0))
                .map(s -> s))
        .collect(Collectors.toList());

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").uid("source")
        .keyBy(s -> s.charAt(0))
        .union(sources.toArray(new SingleOutputStreamOperator[] {}))
        .print();

env.execute("test job"); {code}
h4. Root Cause

We can see that the {{OperatorCoordinatorHolder}} already has a [debounce mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], however the default scheduler does not, and processes many {{OperatorCoordinatorHolder}}.

h4. Fix

I have managed to fix this, I will open a PR, but would need feedback from people who understand this code better than me!

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)