You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Meghajit Mazumdar <me...@gojek.com> on 2022/05/31 08:29:54 UTC

FileSource SourceReader failure scenario

Hello,

I had a question with regards to the behaviour of FileSource and
SourceReader in cases of failures. Let me know if I missed something
conceptually.

We are running a Parquet File Source. Let's say, we supply the source with
a directory path containing 5 files and the Flink job is configured to run
with a parallelism of 2.

When the job starts, 2 SourceReaders are created and when they ask for
splits, the split assigner assigns them one file each, which they start
processing.

Now, the documentation of FileSplitAssigner.addSplits
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.html#addSplits-java.util.Collection->
method says the following:
*Adds a set of splits to this assigner. This happens for example when some
split processing failed and the splits need to be re-added, or when new
splits got discovered.*

I understand this means that un-processed splits or splits that were not
processed completely due to some error with the SourceReader get added back
to the split assigner to be re-assigned to some other SourceReader.

However, the documentation of FileRecordFormat.restoreReader
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.html#restoreReader-org.apache.flink.configuration.Configuration-org.apache.flink.core.fs.Path-long-long-long->
has
this statement written:
*Restores a reader from a checkpointed position. This method is called when
the reader is recovered from a checkpoint and the reader has previously
stored an offset into the checkpoint, by returning from the
FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative
offset. That value is supplied as the restoredOffset.*

I am somewhat confused by these 2 documentation statements. If the split is
added back to the split assigner when split processing got failed by a
SourceReader (maybe due to some exception or fatal error), then the split
could be re-assigned to any other SourceReader next. Even if the failed
SourceReader comes back and starts processing the file from the last
checkpointed offset, there would be duplicate processing as the file could
have been assigned to somebody else in the meantime. Then what is the
purpose of `restoreReader` ? Or, am I missing something ?

-- 
*Regards,*
*Meghajit*

Re: FileSource SourceReader failure scenario

Posted by Meghajit Mazumdar <me...@gojek.com>.
Thanks Qingsheng ! This answers my doubt.

Regards,
Meghajit

On Tue, May 31, 2022 at 3:03 PM Qingsheng Ren <re...@gmail.com> wrote:

> Hi Meghajit,
>
> Good question! To make a short answer: splits won’t be returned back to
> enumerator by reader once they are assigned and *checkpointed*.
>
> As described by the JavaDoc of SplitEnumerator#addSplitsBack [1]:
>
> > Add a split back to the split enumerator. It will only happen when a
> SourceReader fails and there are splits assigned to it after the last
> successful checkpoint.
>
> Suppose we have split A and reader 0, and we have a flow like this:
>
> Checkpoint 100 -> Split assignment (A -> 0) -> Checkpoint 101
>
> After checkpoint 101 the state of split A will be managed by reader 0,
> which means if the reader fails and rolls back to checkpoint 101, the state
> of split A should be recovered by reader instead of returning to the
> enumerator because the split has been delivered to the reader and
> successfully stored into the reader’s checkpoint 101. But if reader 0 fails
> before checkpoint 101 and rolls back to 100, reader 0 is not aware of the
> assignment of split A, then A will be added back to the enumerator and be
> assigned again.
>
> In a nulshell, if a split is assigned to a reader and a checkpoint is made
> successfully, it should be reader’s responsibility to handle the state and
> recover, and the split won’t be returned to the enumerator. A split won’t
> be duplicately assigned or read under this pattern.
>
> Hope this is helpful!
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html
>
> Cheers,
>
> Qingsheng
>
>
> > On May 31, 2022, at 16:29, Meghajit Mazumdar <
> meghajit.mazumdar@gojek.com> wrote:
> >
> > Hello,
> >
> > I had a question with regards to the behaviour of FileSource and
> SourceReader in cases of failures. Let me know if I missed something
> conceptually.
> >
> > We are running a Parquet File Source. Let's say, we supply the source
> with a directory path containing 5 files and the Flink job is configured to
> run with a parallelism of 2.
> >
> > When the job starts, 2 SourceReaders are created and when they ask for
> splits, the split assigner assigns them one file each, which they start
> processing.
> >
> > Now, the documentation of FileSplitAssigner.addSplits method says the
> following:
> > Adds a set of splits to this assigner. This happens for example when
> some split processing failed and the splits need to be re-added, or when
> new splits got discovered.
> >
> > I understand this means that un-processed splits or splits that were not
> processed completely due to some error with the SourceReader get added back
> to the split assigner to be re-assigned to some other SourceReader.
> >
> > However, the documentation of FileRecordFormat.restoreReader has this
> statement written:
> > Restores a reader from a checkpointed position. This method is called
> when the reader is recovered from a checkpoint and the reader has
> previously stored an offset into the checkpoint, by returning from the
> FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative
> offset. That value is supplied as the restoredOffset.
> >
> > I am somewhat confused by these 2 documentation statements. If the split
> is added back to the split assigner when split processing got failed by a
> SourceReader (maybe due to some exception or fatal error), then the split
> could be re-assigned to any other SourceReader next. Even if the failed
> SourceReader comes back and starts processing the file from the last
> checkpointed offset, there would be duplicate processing as the file could
> have been assigned to somebody else in the meantime. Then what is the
> purpose of `restoreReader` ? Or, am I missing something ?
> >
> > --
> > Regards,
> > Meghajit
>
>

-- 
*Regards,*
*Meghajit*

Re: FileSource SourceReader failure scenario

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Meghajit,

Good question! To make a short answer: splits won’t be returned back to enumerator by reader once they are assigned and *checkpointed*. 

As described by the JavaDoc of SplitEnumerator#addSplitsBack [1]:

> Add a split back to the split enumerator. It will only happen when a SourceReader fails and there are splits assigned to it after the last successful checkpoint.

Suppose we have split A and reader 0, and we have a flow like this:

Checkpoint 100 -> Split assignment (A -> 0) -> Checkpoint 101

After checkpoint 101 the state of split A will be managed by reader 0, which means if the reader fails and rolls back to checkpoint 101, the state of split A should be recovered by reader instead of returning to the enumerator because the split has been delivered to the reader and successfully stored into the reader’s checkpoint 101. But if reader 0 fails before checkpoint 101 and rolls back to 100, reader 0 is not aware of the assignment of split A, then A will be added back to the enumerator and be assigned again.

In a nulshell, if a split is assigned to a reader and a checkpoint is made successfully, it should be reader’s responsibility to handle the state and recover, and the split won’t be returned to the enumerator. A split won’t be duplicately assigned or read under this pattern. 

Hope this is helpful!

[1] https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html

Cheers, 

Qingsheng


> On May 31, 2022, at 16:29, Meghajit Mazumdar <me...@gojek.com> wrote:
> 
> Hello,
> 
> I had a question with regards to the behaviour of FileSource and SourceReader in cases of failures. Let me know if I missed something conceptually.
> 
> We are running a Parquet File Source. Let's say, we supply the source with a directory path containing 5 files and the Flink job is configured to run with a parallelism of 2.
> 
> When the job starts, 2 SourceReaders are created and when they ask for splits, the split assigner assigns them one file each, which they start processing.
> 
> Now, the documentation of FileSplitAssigner.addSplits method says the following: 
> Adds a set of splits to this assigner. This happens for example when some split processing failed and the splits need to be re-added, or when new splits got discovered.
> 
> I understand this means that un-processed splits or splits that were not processed completely due to some error with the SourceReader get added back to the split assigner to be re-assigned to some other SourceReader.
> 
> However, the documentation of FileRecordFormat.restoreReader has this statement written:
> Restores a reader from a checkpointed position. This method is called when the reader is recovered from a checkpoint and the reader has previously stored an offset into the checkpoint, by returning from the FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative offset. That value is supplied as the restoredOffset.
> 
> I am somewhat confused by these 2 documentation statements. If the split is added back to the split assigner when split processing got failed by a SourceReader (maybe due to some exception or fatal error), then the split could be re-assigned to any other SourceReader next. Even if the failed SourceReader comes back and starts processing the file from the last checkpointed offset, there would be duplicate processing as the file could have been assigned to somebody else in the meantime. Then what is the purpose of `restoreReader` ? Or, am I missing something ?
> 
> -- 
> Regards,
> Meghajit