You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by ma...@seznam.cz on 2020/02/07 11:18:07 UTC

SplittableDoFn with Flink fails at checkpointing larger files (200MB)

Hi,


   I am using FileIO with continuously watching folder for new files to 
process. The problem is when flink starts reading 200MB file (around 3M 
elements) and also starts checkpointing. Checkpoint never finishes until 
WHOLE file is processed. 

Minimal example :
https://github.com/seznam/beam/blob/simunek/failingCheckpoint/examples/java/
src/main/java/org/apache/beam/examples/CheckpointFailingExample.java
(https://github.com/seznam/beam/blob/simunek/failingCheckpoint/examples/java/src/main/java/org/apache/beam/examples/CheckpointFailingExample.java)

My theory what could be wrong from my understanding :
CheckpointMark in this case starts from Create.ofProvider and then its 
propagated to downstream operators where it will be (in queue) behind all 
splits, which means all splits have to be read to successfully checkpoint 
the operator. The problem is even bigger when there are more files, then we 
need to wait for processing all files to successfully checkpoint.

1. Are my assumption correct?
2. Is there some possibility to improve behavior of SplittableDoFn (or 
subsequent reading from BoundedSource) for Flink to better propagate 
checkpoint barrier?
 
For now my fix is reading smaller files (30MB) one by one, by it’s not very 
future proof.

Versions:
Beam 2.17 
Flink 1.9

Please correct my poor understanding of checkpointing with Beam and Flink 
and it would be wonderful if you have some advice what to improve or where 
to look.

Re: SplittableDoFn with Flink fails at checkpointing larger files (200MB)

Posted by Maximilian Michels <mx...@apache.org>.
Hi Marek,

That's a great question. The answer depends on whether you are using 
portability or the "classic" Runner:

Portability
===========

In portability, the SDF functionality includes the option for the Runner 
to split a given bundle such that the remaining current bundle's work 
will be minimized and the deferred remainder of the work will be 
maximized. That would be ideal for checkpointing as soon as possible [1].

Unfortunately, this is not yet implemented in the Flink Runner. That's 
why you are seeing the entire split finishing before the checkpoint.

Implementing this would mean to issue the split call upon checkpointing, 
making sure to checkpoint the remaining work, and resume it after the 
checkpoint has finished.

Perhaps others also could chime in, if there is anything else missing?

Classic
=======

AFAIK there is no way to do splitting while processing a split. The best 
option would be to create a custom UnboundedSource which creates smaller 
splits. The default is to use the parallelism for the number of splits. 
Depending on your source this may or may not be trivial.

Cheers,
Max

[1] 
https://github.com/apache/beam/blob/6266296ac037afc775735d4f08d25ffcc1a8e897/model/fn-execution/src/main/proto/beam_fn_api.proto#L421

On 07.02.20 12:18, marek-simunek@seznam.cz wrote:
> Hi,
>     I am using FileIO with continuously watching folder for new files to 
> process. The problem is when flink starts reading 200MB file (around 3M 
> elements) and also starts checkpointing. Checkpoint never finishes until 
> WHOLE file is processed.
> 
> Minimal example :
> https://github.com/seznam/beam/blob/simunek/failingCheckpoint/examples/java/src/main/java/org/apache/beam/examples/CheckpointFailingExample.java
> 
> My theory what could be wrong from my understanding :
> CheckpointMark in this case starts from Create.ofProvider and then its 
> propagated to downstream operators where it will be (in queue) behind 
> all splits, which means all splits have to be read to successfully 
> checkpoint the operator. The problem is even bigger when there are more 
> files, then we need to wait for processing all files to successfully 
> checkpoint.
> 
> 1. Are my assumption correct?
> 2. Is there some possibility to improve behavior of SplittableDoFn (or 
> subsequent reading from BoundedSource) for Flink to better propagate 
> checkpoint barrier?
> 
> For now my fix is reading smaller files (30MB) one by one, by it’s not 
> very future proof.
> 
> Versions:
> Beam 2.17
> Flink 1.9
> 
> Please correct my poor understanding of checkpointing with Beam and 
> Flink and it would be wonderful if you have some advice what to improve 
> or where to look.