You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nathan (Jira)" <ji...@apache.org> on 2022/07/21 09:53:00 UTC

[jira] [Created] (FLINK-28622) Can't restore a flink job that uses Table API and Kafka connector with savepoint

Nathan created FLINK-28622:
------------------------------

             Summary: Can't restore a flink job that uses Table API and Kafka connector with savepoint
                 Key: FLINK-28622
                 URL: https://issues.apache.org/jira/browse/FLINK-28622
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.15.0
            Reporter: Nathan


I canceled a flink job with a savepoint, then tried to restore the job with the savepoint (just using the same jar file) but it said it cannot map savepoint state. I was just using the same jar file so I think the execution plan and generated operator ID should be the same? (Flink version has not been changed)
 
Related errors:
{code:java}
used by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

used by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. {code}
My code:
{code:java}
public final class FlinkJob {

    public static void main(String[] args) {

        final String JOB_NAME = "FlinkJob";

        final EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        final TableEnvironment tEnv = TableEnvironment.create(settings);
        tEnv.getConfig().set("pipeline.name", JOB_NAME);
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));

        tEnv.executeSql("CREATE TEMPORARY TABLE ApiLog (" +
                "  `_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL," +
                "  `_partition` INT METADATA FROM 'partition' VIRTUAL," +
                "  `_offset` BIGINT METADATA FROM 'offset' VIRTUAL," +
                "  `Data` STRING," +
                "  `Action` STRING," +
                "  `ProduceDateTime` TIMESTAMP_LTZ(6)," +
                "  `OffSet` INT" +
                ") WITH (" +
                "  'connector' = 'kafka'," +
                "  'topic' = 'api.log'," +
                "  'properties.group.id' = 'flink'," +
                "  'properties.bootstrap.servers' = '<mykafkahost...>'," +
                "  'format' = 'json'," +
                "  'json.timestamp-format.standard' = 'ISO-8601'" +
                ")");

        tEnv.executeSql("CREATE TABLE print_table (" +
                " `_timestamp` TIMESTAMP(3)," +
                " `_partition` INT," +
                " `_offset` BIGINT," +
                " `Data` STRING," +
                " `Action` STRING," +
                " `ProduceDateTime` TIMESTAMP(6)," +
                " `OffSet` INT" +
                ") WITH ('connector' = 'print')");

        tEnv.executeSql("INSERT INTO print_table" +
                " SELECT * FROM ApiLog");

    }

} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)