You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Roman Khachatryan (Jira)" <ji...@apache.org> on 2020/11/11 17:53:00 UTC

[jira] [Created] (FLINK-20097) Race conditions in InputChannel.ChannelStatePersister

Roman Khachatryan created FLINK-20097:
-----------------------------------------

             Summary: Race conditions in InputChannel.ChannelStatePersister
                 Key: FLINK-20097
                 URL: https://issues.apache.org/jira/browse/FLINK-20097
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing, Runtime / Network
    Affects Versions: 1.12.0
            Reporter: Roman Khachatryan
            Assignee: Roman Khachatryan
             Fix For: 1.12.0


In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() always update pendingCheckpointBarrierId, potentially overwriting newer id (or BARRIER_RECEIVED value) with an old one.


For stopPersisting(), consider a case:
 # Two consecutive UC barriers arrive at the same channel (1st being stale at some point)
 # In RemoteInputChannel.onBuffer, netty thread updates pendingCheckpointBarrierId to BARRIER_RECEIVED
 # Task thread processes the 1st barrier and triggers a checkpoint
Task thread processes the 2nd barrier and aborts 1st checkpoint, calling stopPersisting() from UC controller and setting pendingCheckpointBarrierId to CHECKPOINT_COMPLETED
 # Task thread starts 2nd checkpoint and calls startPersisting() setting pendingCheckpointBarrierId to 2
 # now new buffers have a chance to be included in the 2nd checkpoint (though they belong to the next one)

 

For pendingCheckpointBarrierId(), consider an input gate with two channels A and B and two barriers 1 and 2:
 # Channel A receives both barriers, channel B receives nothing yet
 # Task thread processes both barriers on A, eventually triggering 2nd checkpoint
 # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2)
 # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED
 # No buffers in B between barriers 1 and 2 will be included in the checkpoint 
 # Channel B receives the 2nd barrier which will eventually conclude the checkpoint

 

I see a solution in doing an action only if passed checkpointId >= pendingCheckpointId. For that, a separate field will be needed to hold the status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it shouldn't be a problem.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)