You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Stephen Patel (Jira)" <ji...@apache.org> on 2022/05/25 23:04:00 UTC

[jira] [Created] (FLINK-27787) New tasks think they've been restored from savepoint (even when they weren't present in that savepoint)

Stephen Patel created FLINK-27787:
-------------------------------------

             Summary: New tasks think they've been restored from savepoint (even when they weren't present in that savepoint)
                 Key: FLINK-27787
                 URL: https://issues.apache.org/jira/browse/FLINK-27787
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.15.0, 1.14.0
            Reporter: Stephen Patel


I think I've found a bug with new task restoration from savepoints.

I have some beam-on-flink pipelines that I restore from savepoints with new source types (sources not present in the dag that generated the savepoint).

On flink 1.10.1 (beam 2.29) this works fine, the dag spins up with the new sources and starts emitting data.
On flink 1.14.4 (beam 2.38) this no longer works.   The dag spins up, but the sources never emit anything.

I checked the beam source wrapper (it maps the beam source to the underlying runner source: https://github.com/apache/beam/blob/v2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L427), but it hasn't changed in several years.

By inserting some logging statements, I was able to determine that on flink 1.10, the source is told that it is NOT restored (FunctionInitializationContext.isRestored() returns false).  With flink 1.14, it is told that it IS restored.

By traversing the flink code changes, I think I've determined that the changes introduced for FLINK-23854 causes the logic in org.apache.flink.runtime.checkpoint.StateAssignmentOperation.java to not behave the way it used to.

In 1.13.6, we see [here|https://github.com/apache/flink/blob/release-1.13.6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L221-L240] that if a subtask does not have state, it will not have a JobManagerTaskRestore instance set on it.
In 1.14.0 (and all released versions after that), we see [here|https://github.com/apache/flink/blob/release-1.14.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L236-L256] that the chunk of logic that used to be present for dealing with subtasks which have no state is no longer present.  Thus, when restoring from a savepoint, the task will think it's been restored, even when it didn't exist (and thus couldn't have state) in the savepoint.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)