You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/07/21 12:06:00 UTC

[jira] [Commented] (FLINK-28622) Can't restore a flink job that uses Table API and Kafka connector with savepoint

    [ https://issues.apache.org/jira/browse/FLINK-28622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569380#comment-17569380 ] 

Martijn Visser commented on FLINK-28622:
----------------------------------------

[~renqs] Any idea why this would fail? I'm under the impression that if the Flink version, the SQL statement and nothing else changes, savepoints should be supported?

> Can't restore a flink job that uses Table API and Kafka connector with savepoint
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-28622
>                 URL: https://issues.apache.org/jira/browse/FLINK-28622
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.15.0
>            Reporter: Nathan
>            Priority: Critical
>
> I canceled a flink job with a savepoint, then tried to restore the job with the savepoint (just using the same jar file) but it said it cannot map savepoint state. I was just using the same jar file so I think the execution plan and generated operator ID should be the same? (Flink version has not been changed)
>  
> Related errors:
> {code:java}
> used by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
> used by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. {code}
> My code:
> {code:java}
> public final class FlinkJob {
>     public static void main(String[] args) {
>         final String JOB_NAME = "FlinkJob";
>         final EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
>         final TableEnvironment tEnv = TableEnvironment.create(settings);
>         tEnv.getConfig().set("pipeline.name", JOB_NAME);
>         tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
>         tEnv.executeSql("CREATE TEMPORARY TABLE ApiLog (" +
>                 "  `_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL," +
>                 "  `_partition` INT METADATA FROM 'partition' VIRTUAL," +
>                 "  `_offset` BIGINT METADATA FROM 'offset' VIRTUAL," +
>                 "  `Data` STRING," +
>                 "  `Action` STRING," +
>                 "  `ProduceDateTime` TIMESTAMP_LTZ(6)," +
>                 "  `OffSet` INT" +
>                 ") WITH (" +
>                 "  'connector' = 'kafka'," +
>                 "  'topic' = 'api.log'," +
>                 "  'properties.group.id' = 'flink'," +
>                 "  'properties.bootstrap.servers' = '<mykafkahost...>'," +
>                 "  'format' = 'json'," +
>                 "  'json.timestamp-format.standard' = 'ISO-8601'" +
>                 ")");
>         tEnv.executeSql("CREATE TABLE print_table (" +
>                 " `_timestamp` TIMESTAMP(3)," +
>                 " `_partition` INT," +
>                 " `_offset` BIGINT," +
>                 " `Data` STRING," +
>                 " `Action` STRING," +
>                 " `ProduceDateTime` TIMESTAMP(6)," +
>                 " `OffSet` INT" +
>                 ") WITH ('connector' = 'print')");
>         tEnv.executeSql("INSERT INTO print_table" +
>                 " SELECT * FROM ApiLog");
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)