You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yuan Mei (Jira)" <ji...@apache.org> on 2020/10/30 08:52:00 UTC

[jira] [Comment Edited] (FLINK-19632) Introduce a new ResultPartitionType for Approximate Local Recovery

    [ https://issues.apache.org/jira/browse/FLINK-19632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17223499#comment-17223499 ] 

Yuan Mei edited comment on FLINK-19632 at 10/30/20, 8:51 AM:
-------------------------------------------------------------

Here are some good discussions about motivations why to bring a new ResultPartitionType which I think worth putting here.

quote from [~trohrmann]
{quote}I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions?
{quote}
Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.

*1) The first and most important reason is isolating changes to avoid affecting the normal execution path.*
 Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation.

Even though conceptually yes, having *different implementation subclasses for different connection behavior* does seem reasonable. It simplifies the logic for different behavior. *So personally, I am leaning not to unify them*.

But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.

*2) Differences between these two types;*
 For upstream-reconnection, there are mainly two differences: *read* and *release* upon these two types.
 * In normal pipeline mode, for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
 * In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
 ** for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
 ** for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).

quote from [~trohrmann]
{quote}If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
{quote}
This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.

quote from [~trohrmann]
{quote}As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
{quote}
I totally agree.

Right now, the life cycle of {{ResultPartitionType.PIPELINED(_BOUNDED)}} is “binding” to the consumer task, not very intuitive but reasonable. Because {{PIPELINED(_BOUNDED)}} is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.

Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.

 


was (Author: ym):
Here are some good discussions about motivations why to bring a new ResultPartitionType which I think worth putting here.
{quote}I wanted to ask why we need a special ResultPartitionType for the approximate local recovery? Shouldn't it be conceptually possible that we support the normal and approximative recovery behaviour with the same pipelined partitions?
{quote}
Conceptually speaking, yes, it is possible to unify normal and approximation pipelined partitions; Practically speaking, I am not 100% sure they can be. There are mainly two considerations leading me to introduce a different type.

*1) The first and most important reason is isolating changes to avoid affecting the normal execution path.*
Since I am not sure whether the two can be unified, so I go with the safe step first. This is also to identify differences between these two types (There might be more differences for downstream reconnection as well). There could be corner cases that I am not aware of until real implementation.

Even though conceptually yes, having *different implementation subclasses for different connection behavior* does seem reasonable. It simplifies the logic for different behavior. *So personally, I am leaning not to unify them*.

But certainly, if it turns out to be cleaner and simpler to unify the two types, I have no objection to doing so. But from safety and easier-developing purpose, starting with a different type seems to be a better choice.

*2) Differences between these two types;*
For upstream-reconnection, there are mainly two differences: *read* and *release* upon these two types.
 * In normal pipeline mode, for each subpartition, its view is created once, and released when downstream disconnects. View release will cause subpartition release, and eventually partition release.
 * In approximate mode, for each subpartition, a view can be created and released multiple times as long as one view is available at one instant for a subpartition.
 ** for reading: upon reconnection, the reader should clean-up partial record caused by downstream failure (This could be easily unified)
 ** for release: a partition is released only if the partition finishes consumption (all data read) or its producer failed. The partition should not be released when all its views are released because new views can be created. (a bit difficult based on the current setting, let's discuss in the lifecycle part later).

{quote}If we say that we can reconnect to every pipelined result partition (including dropping partially consumed results), then it can be the responsibility of the scheduler to make sure that producers are restarted as well in order to ensure exactly/at-least once processing guarantees. If not, then we would simply consume from where we have left off.
{quote}
This seems true for now, since we can achieve exactly/at least-once through RegionPipeline failover, and approximate through single task failover. But I am not sure in the future. Later, if we want to support single task failover with at least once/exactly once, where channel data may persist somewhere, I can not say for sure this is purely a scheduler decision. We may end up having high chances to introduce more connection types for single task failover to support at least once/exactly once.
{quote}As far as I understand the existing ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the result partition if the downstream consumer disconnects. I believe that this is not a strict contract of pipelined result partitions but more of an implementation artefact. Couldn't we solve the problem of disappearing pipelined result partitions by binding the lifecyle of a pipelined result partition to the lifecycle of a Task? We could say that a Task can only terminate once the pipelined result partition has been consumed. Moreover, a Task will clean up the result partition if it fails or gets canceled. That way, we have a clearly defined lifecycle and make sure that these results get cleaned up (iff the Task reaches a terminal state).
{quote}
I totally agree.

Right now, the life cycle of {{ResultPartitionType.PIPELINED(_BOUNDED)}} is “binding” to the consumer task, not very intuitive but reasonable. Because {{PIPELINED(_BOUNDED)}} is consumed only once and as long as the downstream restarts, the upstream is restarting correspondingly.

Is it reasonable to bind the partition to the producer? Yes, I think it is following the best intuition as long as we make the task terminate after its produced result partition is consumed. I think this can also simplify the logic that needs to be applied on partitions through task resources but cannot due to the task has already terminated.

 

> Introduce a new ResultPartitionType for Approximate Local Recovery
> ------------------------------------------------------------------
>
>                 Key: FLINK-19632
>                 URL: https://issues.apache.org/jira/browse/FLINK-19632
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Yuan Mei
>            Assignee: Yuan Mei
>            Priority: Major
>              Labels: pull-request-available
>
> # On downstream node failure, the upstream node needs to release a sub-partition view while keeping the result partition
>  # When the downstream node reconnects, the subpartition view needs to be recreated. If the previous subpartition view still exists, it needs to be released.
> Clean up of the partial records should be handled in a separate Jira ticket FLINK-19547
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)