You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Haibo Sun (Jira)" <ji...@apache.org> on 2020/02/19 12:54:00 UTC

[jira] [Created] (FLINK-16174) Add a better tryYield() method to MailboxExecutor to return the lowest priority of the remaining mails

Haibo Sun created FLINK-16174:
---------------------------------

             Summary: Add a better tryYield() method to MailboxExecutor to return the lowest priority of the remaining mails
                 Key: FLINK-16174
                 URL: https://issues.apache.org/jira/browse/FLINK-16174
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Task
    Affects Versions: 1.10.0
            Reporter: Haibo Sun


Currently, we use chainIndex as the priority to create MailboxExecutor to process its mails. When MailboxExecutor#tryYield  is called to process mails, it will take the mails of this operator and all downstream operators in the chain. But sometimes, after calling MailboxExecutor#tryYield, we need to know whether there is any mail of the current operator in the mailbox, which can simplify some operations. 

For example, when we close a operator in runtime, after quiescing the processing time service and waiting for its running timers to finish, if there is no mail of the current operator in the mailbox,  we call StreamOperator#close to close the operator. Then the runtime code of closing a operator can be simplified as follows.
{code:java}
		quiesceProcessingTimeService().get();
		while (mailboxExecuto.betterTryYield() <= self.priority) {}
		closeOperator(actionExecutor);
{code}

With the existing #tryYield method, if the following simplified code is used to close a operator, then when a downstream operator is implemented like MailboxOperatorTest.ReplicatingMail, the tryyield() loop will 
 be prevented from exiting, which results deadlock.

{code:java}
		quiesceProcessingTimeService().get();
		while (mailboxExecuto.tryYield()) {}
		closeOperator(actionExecutor);
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)