You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Denis Golovachev (Jira)" <ji...@apache.org> on 2022/12/27 17:21:00 UTC

[jira] [Created] (FLINK-30514) HybridSource savepoint recovery sequence

Denis Golovachev created FLINK-30514:
----------------------------------------

             Summary: HybridSource savepoint recovery sequence
                 Key: FLINK-30514
                 URL: https://issues.apache.org/jira/browse/FLINK-30514
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.15.3, 1.15.2, 1.16.0
            Reporter: Denis Golovachev


{{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} accumulates splits during recovery in

{{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}.

As a next step it creates a reader and pushes all {{recoveredSplits}} the reader.
{{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}}

Instantiation sequence of the {{setCurrentReader}} is following
 - {{reader.start()}}
 - {{reader.addSplits()}}

Seems like it doesn't work if we use {{FileSourceReader}} as an underlying reader.


{{FileSourceReader#start()}} method checks if any splits are available to read and executes {{sendSplitRequest}} if empty. Current {{HybridSourceReader}} recovery sequence is not aligned with this.

So, every time we recover we immediately jump to the next splits. 
Let me show you some logs. In this experiment job should have started with files inside the 1000000 bucket but jumped to the bucket number 2000000
Job Manager
{code:java}
2022-12-27 13:38:47.155 StaticFileSplitEnumerator  - Assigned split to subtask 1 : FileSourceSplit: s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087)  hosts=[localhost] ID=0000000032 position=null
2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 9 : FileSourceSplit: s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071)  hosts=[localhost] ID=0000000033 position=null
2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 6 : FileSourceSplit: s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047)  hosts=[localhost] ID=0000000031 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 5 : FileSourceSplit: s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878)  hosts=[localhost] ID=0000000034 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 2 : FileSourceSplit: s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205)  hosts=[localhost] ID=0000000040 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 4 : FileSourceSplit: s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601)  hosts=[localhost] ID=0000000035 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 8 : FileSourceSplit: s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256)  hosts=[localhost] ID=0000000036 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 3 : FileSourceSplit: s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880)  hosts=[localhost] ID=0000000037 position=null
2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 0 : FileSourceSplit: s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425)  hosts=[localhost] ID=0000000038 position=null
2022-12-27 13:38:47.158 StaticFileSplitEnumerator  - Assigned split to subtask 7 : FileSourceSplit: s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709)  hosts=[localhost] ID=0000000039 position=null
{code}
Task Manager
{code:java}
2246:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236)  hosts=[localhost] ID=0000000018 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]
2247:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191)  hosts=[localhost] ID=0000000011 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]
2248:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830)  hosts=[localhost] ID=0000000020 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]
2249:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80055663)  hosts=[localhost] ID=0000000015 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]
2250:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80022187)  hosts=[localhost] ID=0000000016 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]
2251:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80109242)  hosts=[localhost] ID=0000000017 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]
2252:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79980911)  hosts=[localhost] ID=0000000012 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]
2253:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79996693)  hosts=[localhost] ID=0000000014 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]
2254:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80040476)  hosts=[localhost] ID=0000000013 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]
2255:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79986997)  hosts=[localhost] ID=0000000019 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]
2265:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80109242)  hosts=[localhost] ID=0000000017 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]]
2266:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80055663)  hosts=[localhost] ID=0000000015 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]]
2267:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80022187)  hosts=[localhost] ID=0000000016 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]]
2268:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79996693)  hosts=[localhost] ID=0000000014 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]]
2269:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830)  hosts=[localhost] ID=0000000020 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]]
2270:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80040476)  hosts=[localhost] ID=0000000013 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]]
2271:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236)  hosts=[localhost] ID=0000000018 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]]
2272:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79980911)  hosts=[localhost] ID=0000000012 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]]
2273:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79986997)  hosts=[localhost] ID=0000000019 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]]
2275:2022-12-27 13:38:47.116 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191)  hosts=[localhost] ID=0000000011 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]]
2281:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205)  hosts=[localhost] ID=0000000040 position=null]
2282:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087)  hosts=[localhost] ID=0000000032 position=null]
2283:2022-12-27 13:38:47.159 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071)  hosts=[localhost] ID=0000000033 position=null]
2284:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047)  hosts=[localhost] ID=0000000031 position=null]
2285:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878)  hosts=[localhost] ID=0000000034 position=null]
2288:2022-12-27 13:38:47.161 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256)  hosts=[localhost] ID=0000000036 position=null]
2289:2022-12-27 13:38:47.161 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601)  hosts=[localhost] ID=0000000035 position=null]
2292:2022-12-27 13:38:47.162 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880)  hosts=[localhost] ID=0000000037 position=null]
2293:2022-12-27 13:38:47.163 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425)  hosts=[localhost] ID=0000000038 position=null]
2295:2022-12-27 13:38:47.163 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709)  hosts=[localhost] ID=0000000039 position=null]
{code}
Same logs in github gist: [https://gist.github.com/WonderBeat/ddfdc852556997b09451d48766b54183]

This can be fixed with a simple method reordering in the {{{}HybridSourceReader{}}}. {{"reader.addSplits}} -> {{reader.start"}} sounds logical, wdyt?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)