You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Matthias Pohl (Jira)" <ji...@apache.org> on 2023/02/20 16:44:00 UTC

[jira] [Created] (FLINK-31143) Invalid request: offset doesn't match when restarting from a savepoint

Matthias Pohl created FLINK-31143:
-------------------------------------

             Summary: Invalid request: offset doesn't match when restarting from a savepoint
                 Key: FLINK-31143
                 URL: https://issues.apache.org/jira/browse/FLINK-31143
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.16.1, 1.15.3, 1.17.0
            Reporter: Matthias Pohl


I tried to run the following case:
{code:java}
public static void main(String[] args) throws Exception {
        final String createTableQuery =
                "CREATE TABLE left_table (a int, c varchar) "
                        + "WITH ("
                        + "     'connector' = 'datagen', "
                        + "     'rows-per-second' = '1', "
                        + "     'fields.a.kind' = 'sequence', "
                        + "     'fields.a.start' = '0', "
                        + "     'fields.a.end' = '100000'"
                        + ");";
        final String selectQuery = "SELECT * FROM left_table;";

        final Configuration initialConfig = new Configuration();
        initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);

        final EnvironmentSettings initialSettings =
                EnvironmentSettings.newInstance()
                        .inStreamingMode()
                        .withConfiguration(initialConfig)
                        .build();
        final TableEnvironment initialTableEnv = TableEnvironment.create(initialSettings);

        initialTableEnv.executeSql(createTableQuery);
        final TableResult tableResult = initialTableEnv.sqlQuery(selectQuery).execute();
        tableResult.await();

        final String savepointPath;
        try (CloseableIterator<Row> tableResultIterator = tableResult.collect()) {
            // consume two results
            System.out.println(tableResultIterator.next());
            System.out.println(tableResultIterator.next());

            final JobClient jobClient =
                    tableResult.getJobClient().orElseThrow(IllegalStateException::new);

            final File savepointDirectory = Files.createTempDir();
            savepointPath =
                    jobClient
                            .stopWithSavepoint(
                                    true,
                                    savepointDirectory.getAbsolutePath(),
                                    SavepointFormatType.CANONICAL)
                            .get();
        }

        final SavepointRestoreSettings savepointRestoreSettings =
                SavepointRestoreSettings.forPath(savepointPath, true);
        final Configuration restartConfig = new Configuration(initialConfig);
        SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, restartConfig);

        final EnvironmentSettings restartSettings =
                EnvironmentSettings.newInstance()
                        .inStreamingMode()
                        .withConfiguration(restartConfig)
                        .build();
        final TableEnvironment restartTableEnv = TableEnvironment.create(restartSettings);

        restartTableEnv.executeSql(createTableQuery);
        restartTableEnv.sqlQuery(selectQuery).execute().print();
    }
{code}

Expected behavior: The job continues omitting the inital two records and starts printing results from 2 onwards.

Observed behavior:
No results are printed. The logs show that an invalid request was handled:
{code}
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, offset = 1
{code}
It looks like the right offset is not picked up from the savepoint (see [CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)