You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jozef Vilcek <jo...@gmail.com> on 2018/11/20 10:38:34 UTC

[BEAM-6077] FlinkRunner: Make UnboundedSource state re-scale friendly

I want to reach out for opinions on what would be the best way to proceed
with https://issues.apache.org/jira/browse/BEAM-6077

The problem is, that when FlinkRunner job is being restored from
checkpoint, it needs to resurrect source and it's readers given the
checkpoint state. State element is represented by
`UnboundedSource.CheckpointMark` which does not tell much information.
Within CheckpointMark there might be already stored state per key, e.g. in
case of Kafka it is list of PartitionMarks having each partition_id and
offset.

UnboundedSource can create a reader per single CheckpointMark and reader
can produce single CheckpointMark from it's state. Now at rescale, number
of CheckpointMarks retrieved from state does not correspond to actual
parallelism. Merge or flatten needs to be invoked over list of marks read
from state. The question is, where such logic and knowledge should be.

It feels similar to UnboundedSource.split(parallelism, pipelineOptions) and
also maybe related somehow to SplittableDoFn logic. Not sure.

My question is:
1. Is there a way to achieve such splitting / merging of checkpoint mark
with current SDK?
2. If not and it make sense to add it where it would best go? Source?
3. Some other approach Beam rookie as me do not see?

Best,
Jozef

Re: [BEAM-6077] FlinkRunner: Make UnboundedSource state re-scale friendly

Posted by Maximilian Michels <mx...@apache.org>.
Hi Jozef,

I responded on JIRA today before I saw your mail here.

The splitting of the UnboundedSource is performed during translation of 
the Beam pipeline. It think it would be feasible to use Flink's maximum 
parallelism instead of the configured parallelism. That would enable to 
increase the parallelism at a later point in time.

Another option would be to split the sources again when scaling up; I'm 
not sure whether that would work for all sources. Scaling down should be 
easy because the wrapper supports reading from multiple sources.

Cheers,
Max

On 20.11.18 11:38, Jozef Vilcek wrote:
> I want to reach out for opinions on what would be the best way to 
> proceed with https://issues.apache.org/jira/browse/BEAM-6077
> 
> The problem is, that when FlinkRunner job is being restored from 
> checkpoint, it needs to resurrect source and it's readers given the 
> checkpoint state. State element is represented by 
> `UnboundedSource.CheckpointMark` which does not tell much information. 
> Within CheckpointMark there might be already stored state per key, e.g. 
> in case of Kafka it is list of PartitionMarks having each partition_id 
> and offset.
> 
> UnboundedSource can create a reader per single CheckpointMark and reader 
> can produce single CheckpointMark from it's state. Now at rescale, 
> number of CheckpointMarks retrieved from state does not correspond to 
> actual parallelism. Merge or flatten needs to be invoked over list of 
> marks read from state. The question is, where such logic and knowledge 
> should be.
> 
> It feels similar to UnboundedSource.split(parallelism, pipelineOptions) 
> and also maybe related somehow to SplittableDoFn logic. Not sure.
> 
> My question is:
> 1. Is there a way to achieve such splitting / merging of checkpoint mark 
> with current SDK?
> 2. If not and it make sense to add it where it would best go? Source?
> 3. Some other approach Beam rookie as me do not see?
> 
> Best,
> Jozef