You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yanfei Lei (Jira)" <ji...@apache.org> on 2022/11/29 04:04:00 UTC
[jira] [Comment Edited] (FLINK-27787) New tasks think they've been restored from savepoint (even when they weren't present in that savepoint)
[ https://issues.apache.org/jira/browse/FLINK-27787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640369#comment-17640369 ]
Yanfei Lei edited comment on FLINK-27787 at 11/29/22 4:03 AM:
--------------------------------------------------------------
[~arvid] could you please take a look? I found that [PR17019|https://github.com/apache/flink/pull/17019] has discussed [related possibilities|#r697712620]]
was (Author: yanfei lei):
[~arvid] could you please take a look? I found that [PR17019|https://github.com/apache/flink/pull/17019] has discussed [related possibilities| [https://github.com/apache/flink/pull/17019/files#r697712620]|https://github.com/apache/flink/pull/17019/files#r697712620],]
> New tasks think they've been restored from savepoint (even when they weren't present in that savepoint)
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-27787
> URL: https://issues.apache.org/jira/browse/FLINK-27787
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.15.0
> Reporter: Stephen Patel
> Priority: Major
>
> I think I've found a bug with new task restoration from savepoints.
> I have some beam-on-flink pipelines that I restore from savepoints with new source types (sources not present in the dag that generated the savepoint).
> On flink 1.10.1 (beam 2.29) this works fine, the dag spins up with the new sources and starts emitting data.
> On flink 1.14.4 (beam 2.38) this no longer works. The dag spins up, but the sources never emit anything.
> I checked the beam source wrapper (it maps the beam source to the underlying runner source: https://github.com/apache/beam/blob/v2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L427), but it hasn't changed in several years.
> By inserting some logging statements, I was able to determine that on flink 1.10, the source is told that it is NOT restored (FunctionInitializationContext.isRestored() returns false). With flink 1.14, it is told that it IS restored.
> By traversing the flink code changes, I think I've determined that the changes introduced for FLINK-23854 causes the logic in org.apache.flink.runtime.checkpoint.StateAssignmentOperation.java to not behave the way it used to.
> In 1.13.6, we see [here|https://github.com/apache/flink/blob/release-1.13.6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L221-L240] that if a subtask does not have state, it will not have a JobManagerTaskRestore instance set on it, so the FunctionInitializationContext.isRestored() method will return false.
> In 1.14.0 (and all released versions after that), we see [here|https://github.com/apache/flink/blob/release-1.14.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L236-L256] that the chunk of logic that used to be present for dealing with subtasks which have no state is no longer present. Thus, when restoring from a savepoint, the task will think it's been restored, even when it didn't exist (and thus couldn't have state) in the savepoint.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)