You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Matthias (Jira)" <ji...@apache.org> on 2021/10/11 11:04:00 UTC

[jira] [Created] (FLINK-24506) checkpoint directory is not configurable through the Flink configuration passed into the StreamExecutionEnvironment

Matthias created FLINK-24506:
--------------------------------

             Summary: checkpoint directory is not configurable through the Flink configuration passed into the StreamExecutionEnvironment
                 Key: FLINK-24506
                 URL: https://issues.apache.org/jira/browse/FLINK-24506
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Configuration, Runtime / State Backends
    Affects Versions: 1.13.2, 1.14.0
            Reporter: Matthias


FLINK-19463 introduced the separation of `StateBackend` and `CheckpointStorage`. Before that, both were included in the same interface implementation [AbstractFileStateBackend|https://github.com/apache/flink/blob/0a76daba0a428a322f0273d7dc6a70966f62bf26/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java]. `FsStateBackend` was used as a default implementation pre-1.13.

pre-{{1.13}} initialized the checkpoint directory when instantiating the state backend (see [FsStateBackendFactory|https://github.com/apache/flink/blob/release-1.12/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java#L46]). Starting from {{1.13}} loading the {{CheckpointStorage}} is done by the {{CheckpointStorageLoader.load}} method that is called in various places:
* Savepoint Disposal (through {{Checkpoints.loadCheckpointStorage}}) where it only relies on the configuration passed in by the cluster configuration (no application checkpoint storage is passed)
* {{SchedulerBase}} initialization (through DefaultExecutionGraphBuilder) where it’s based on the cluster’s configuration but also the application configuration (i.e. the {{JobGraph}}’s setting) that would be considered if {{CheckpointConfig#configure}} would have the checkpoint storage included
* {{StreamTask}} on the {{TaskManager}}’s side where it’s based on the configuration passed in by the {{JobVertex}} for the application’s {{CheckpointStorage}} and the {[TaskManager}}’s configuration (coming from the session cluster) for the fallback {{CheckpointStorage}}

The issue is that we don't set the checkpoint directory in the {{CheckpointConfig}}. Hence, it's not going to get picked up as a job-related property. Flink always uses the fallback provided by the session cluster configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)