You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Colin Smetz (Jira)" <ji...@apache.org> on 2022/08/08 07:12:00 UTC

[jira] [Created] (FLINK-28861) Cannot resume from savepoint when using fromChangelogStream in upsert mode

Colin Smetz created FLINK-28861:
-----------------------------------

             Summary: Cannot resume from savepoint when using fromChangelogStream in upsert mode
                 Key: FLINK-28861
                 URL: https://issues.apache.org/jira/browse/FLINK-28861
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.15.1
            Reporter: Colin Smetz


I want to use the savepoint mechanism to move existing jobs from one version of Flink to another, by:
 # Stopping a job with a savepoint
 # Creating a new job from the savepoint, on the new version.

In Flink 1.15.1, it fails, even when going from 1.15.1 to 1.15.1 I get this error, meaning that it could not map the state from the previous job to the new one because of one operator:

{{Failed to rollback to checkpoint/savepoint hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c to the new program, because the operator is not available in the new program.}}
After investigation, the problematic operator corresponds to a {{ChangelogNormalize}} operator, that I do not explicitly create. It is generated because I use [{{tableEnv.fromChangelogStream(stream, schema, ChangelogMode.upsert())}}|https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromChangelogStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-org.apache.flink.table.connector.ChangelogMode-] (the upsert mode is important, other modes do not fail). The table created is passed to an SQL query using the SQL API, which generates something like:

{{ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> [my_sql_transformation] -> [my_sink]}}
In previous versions of Flink it seems this operator was always given the same uid so the state could match when starting from the savepoint. In Flink 1.15.1, I see that a different uid is generated every time. I could not find a reliable way to set that uid manually. The only way I found was by going backwards from the transformation:

{{dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)