You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Matthias Pohl (Jira)" <ji...@apache.org> on 2023/02/20 16:44:00 UTC
[jira] [Created] (FLINK-31143) Invalid request: offset doesn't match when restarting from a savepoint
Matthias Pohl created FLINK-31143:
-------------------------------------
Summary: Invalid request: offset doesn't match when restarting from a savepoint
Key: FLINK-31143
URL: https://issues.apache.org/jira/browse/FLINK-31143
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.16.1, 1.15.3, 1.17.0
Reporter: Matthias Pohl
I tried to run the following case:
{code:java}
public static void main(String[] args) throws Exception {
final String createTableQuery =
"CREATE TABLE left_table (a int, c varchar) "
+ "WITH ("
+ " 'connector' = 'datagen', "
+ " 'rows-per-second' = '1', "
+ " 'fields.a.kind' = 'sequence', "
+ " 'fields.a.start' = '0', "
+ " 'fields.a.end' = '100000'"
+ ");";
final String selectQuery = "SELECT * FROM left_table;";
final Configuration initialConfig = new Configuration();
initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
final EnvironmentSettings initialSettings =
EnvironmentSettings.newInstance()
.inStreamingMode()
.withConfiguration(initialConfig)
.build();
final TableEnvironment initialTableEnv = TableEnvironment.create(initialSettings);
initialTableEnv.executeSql(createTableQuery);
final TableResult tableResult = initialTableEnv.sqlQuery(selectQuery).execute();
tableResult.await();
final String savepointPath;
try (CloseableIterator<Row> tableResultIterator = tableResult.collect()) {
// consume two results
System.out.println(tableResultIterator.next());
System.out.println(tableResultIterator.next());
final JobClient jobClient =
tableResult.getJobClient().orElseThrow(IllegalStateException::new);
final File savepointDirectory = Files.createTempDir();
savepointPath =
jobClient
.stopWithSavepoint(
true,
savepointDirectory.getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();
}
final SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath(savepointPath, true);
final Configuration restartConfig = new Configuration(initialConfig);
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, restartConfig);
final EnvironmentSettings restartSettings =
EnvironmentSettings.newInstance()
.inStreamingMode()
.withConfiguration(restartConfig)
.build();
final TableEnvironment restartTableEnv = TableEnvironment.create(restartSettings);
restartTableEnv.executeSql(createTableQuery);
restartTableEnv.sqlQuery(selectQuery).execute().print();
}
{code}
Expected behavior: The job continues omitting the inital two records and starts printing results from 2 onwards.
Observed behavior:
No results are printed. The logs show that an invalid request was handled:
{code}
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, offset = 1
{code}
It looks like the right offset is not picked up from the savepoint (see [CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)