You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yanfei Lei (Jira)" <ji...@apache.org> on 2022/11/29 04:04:00 UTC

[jira] [Comment Edited] (FLINK-27787) New tasks think they've been restored from savepoint (even when they weren't present in that savepoint)

    [ https://issues.apache.org/jira/browse/FLINK-27787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640369#comment-17640369 ] 

Yanfei Lei edited comment on FLINK-27787 at 11/29/22 4:03 AM:
--------------------------------------------------------------

[~arvid] could you please take a look?  I found that [PR17019|https://github.com/apache/flink/pull/17019] has discussed [related possibilities|#r697712620]]


was (Author: yanfei lei):
[~arvid] could you please take a look?  I found that [PR17019|https://github.com/apache/flink/pull/17019] has discussed [related possibilities| [https://github.com/apache/flink/pull/17019/files#r697712620]|https://github.com/apache/flink/pull/17019/files#r697712620],]

> New tasks think they've been restored from savepoint (even when they weren't present in that savepoint)
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27787
>                 URL: https://issues.apache.org/jira/browse/FLINK-27787
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.15.0
>            Reporter: Stephen Patel
>            Priority: Major
>
> I think I've found a bug with new task restoration from savepoints.
> I have some beam-on-flink pipelines that I restore from savepoints with new source types (sources not present in the dag that generated the savepoint).
> On flink 1.10.1 (beam 2.29) this works fine, the dag spins up with the new sources and starts emitting data.
> On flink 1.14.4 (beam 2.38) this no longer works.   The dag spins up, but the sources never emit anything.
> I checked the beam source wrapper (it maps the beam source to the underlying runner source: https://github.com/apache/beam/blob/v2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L427), but it hasn't changed in several years.
> By inserting some logging statements, I was able to determine that on flink 1.10, the source is told that it is NOT restored (FunctionInitializationContext.isRestored() returns false).  With flink 1.14, it is told that it IS restored.
> By traversing the flink code changes, I think I've determined that the changes introduced for FLINK-23854 causes the logic in org.apache.flink.runtime.checkpoint.StateAssignmentOperation.java to not behave the way it used to.
> In 1.13.6, we see [here|https://github.com/apache/flink/blob/release-1.13.6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L221-L240] that if a subtask does not have state, it will not have a JobManagerTaskRestore instance set on it, so the FunctionInitializationContext.isRestored() method will return false.
> In 1.14.0 (and all released versions after that), we see [here|https://github.com/apache/flink/blob/release-1.14.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L236-L256] that the chunk of logic that used to be present for dealing with subtasks which have no state is no longer present.  Thus, when restoring from a savepoint, the task will think it's been restored, even when it didn't exist (and thus couldn't have state) in the savepoint.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)