You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/01/25 13:20:26 UTC

[jira] [Commented] (FLINK-5638) Deadlock when closing two chained async I/O operators

    [ https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837716#comment-15837716 ] 

ASF GitHub Bot commented on FLINK-5638:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/3209

    [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators

    This PR addresses the problem by changing the Emitter's behaviour to first output the
    element before removing it from the StreamElementQueue. That way the close method waits
    until also the Emitter has outputted the last completed element. Additionally, the
    stopResources method now frees the checkpoint lock in order to let the emitter thread
    react to the interrupt signal.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink asyncIOFixDeadlock

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3209
    
----
commit 30520d95786d630eb14ff613d0990ce03779dd3c
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-01-25T13:11:48Z

    [FLINK-5638] [asyncIO] Fix deadlock when closing two chained AsyncWaitOperators
    
    This PR addresses the problem by changing the Emitter's behaviour to first output the
    element before removing it from the StreamElementQueue. That way the close method waits
    until also the Emitter has outputted the last completed element. Additionally, the
    stopResources method now frees the checkpoint lock in order to let the emitter thread
    react to the interrupt signal.

----


> Deadlock when closing two chained async I/O operators
> -----------------------------------------------------
>
>                 Key: FLINK-5638
>                 URL: https://issues.apache.org/jira/browse/FLINK-5638
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.2.0, 1.3.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.2.0, 1.3.0
>
>
> The {{AsyncWaitOperator}} can deadlock in a special cases when closing two chained {{AsyncWaitOperator}} while there is still one element between these two operators in flight.
> The deadlock scenario is the following: Given two chained {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element from the queue and output it to {{a2}}. This poll and output operation happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the {{e1}} thread will directly call {{a2's}} {{processElement}} function. In this function, we try to add the new element to the {{StreamElementQueue}}. Now assume that this queue is full. Then the operation will release the checkpoint lock and wait until it is notified again.
> In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we have consumed all input. The close operation also happens under the checkpoint lock. First the close method waits until all elements from the {{StreamElementQueue}} have been processed (== empty). This happens by waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock. Since the closing operation does not release the checkpoint lock, {{e1}} cannot regain the synchronization lock and voila we have a deadlock.
> There are two problems which cause the problem:
> 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if the queue is empty. This is usually the case if the output operation is atomic. However in the chained case it can happen that the emitter thread has to wait to insert the element into the queue of the next {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint lock and, thus, the output operation is no longer atomic. We can solve this problem by polling the last queue element after we have outputted it instead of before.
> 2. We interrupt the emitter thread while holding the checkpoint lock and not freeing it again. Under these circumstances, the interrupt signal is meaningless because the emitter thread also needs control over the checkpoint lock. We should solve the problem by waiting on the checkpoint lock and periodically checking whether the thread has already stopped or not.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/194729330/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)