You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Frans King (Jira)" <ji...@apache.org> on 2020/10/06 15:24:00 UTC

[jira] [Updated] (FLINK-19515) Async RequestReply handler concurrency bug

     [ https://issues.apache.org/jira/browse/FLINK-19515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Frans King updated FLINK-19515:
-------------------------------
    Description: 
Async RequestReply handler implemented in https://issues.apache.org/jira/browse/FLINK-18518 has a concurrency problem.

 

Lines 151 to 152 of [https://github.com/apache/flink-statefun/blob/master/statefun-python-sdk/statefun/request_reply.py]

The coro is awaiting and may yield.  Another coro may continue that was yielding and call ic.complete() which sets the ic.context to None

 

In short:

 
{code:java}
ic.setup(request_bytes)        
await self.handle_invocation(ic)        
return ic.complete()
 
{code}
Needs to happen atomically.

 

I worked around this by creating an AsyncRequestReplyHandler for each request.

 

It should be possible to re-produce this by putting an await asyncio.sleep(5) in the greeter example and then run in gunicorn with a single asyncio thread/event loop (-w 1).  

 

 

 
{code:java}
    response_data = await handler(request_data)
  File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 152, in __call__
    return ic.complete()
  File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 57, in complete
    self.add_mutations(context, invocation_result)
  File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations
    for name, handle in context.states.items():
AttributeError: 'NoneType' object has no attribute 'states'
{code}
 

  was:
Async RequestReply handler implemented in https://issues.apache.org/jira/browse/FLINK-18518 has a concurrency problem.

 

Lines 151 to 152 of [https://github.com/apache/flink-statefun/blob/master/statefun-python-sdk/statefun/request_reply.py]

The coro is awaiting and may yield.  Another coro may continue that was yielding and call ic.complete() which sets the ic.context to None

 

In short:

 
{code:java}
ic.setup(request_bytes)        
await self.handle_invocation(ic)        
return ic.complete()
 
{code}
Needs to happen atomically.

 

I worked around this by creating an AsyncRequestReplyHandler for each request.

 

It should be possible to re-produce this by putting an await asyncio.sleep(5) in the greeter example and then run in gunicorn with a single asyncio thread/event loop.  

 

 

 
{code:java}
    response_data = await handler(request_data)
  File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 152, in __call__
    return ic.complete()
  File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 57, in complete
    self.add_mutations(context, invocation_result)
  File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations
    for name, handle in context.states.items():
AttributeError: 'NoneType' object has no attribute 'states'
{code}
 


> Async RequestReply handler concurrency bug
> ------------------------------------------
>
>                 Key: FLINK-19515
>                 URL: https://issues.apache.org/jira/browse/FLINK-19515
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: statefun-2.2.0
>            Reporter: Frans King
>            Priority: Minor
>
> Async RequestReply handler implemented in https://issues.apache.org/jira/browse/FLINK-18518 has a concurrency problem.
>  
> Lines 151 to 152 of [https://github.com/apache/flink-statefun/blob/master/statefun-python-sdk/statefun/request_reply.py]
> The coro is awaiting and may yield.  Another coro may continue that was yielding and call ic.complete() which sets the ic.context to None
>  
> In short:
>  
> {code:java}
> ic.setup(request_bytes)        
> await self.handle_invocation(ic)        
> return ic.complete()
>  
> {code}
> Needs to happen atomically.
>  
> I worked around this by creating an AsyncRequestReplyHandler for each request.
>  
> It should be possible to re-produce this by putting an await asyncio.sleep(5) in the greeter example and then run in gunicorn with a single asyncio thread/event loop (-w 1).  
>  
>  
>  
> {code:java}
>     response_data = await handler(request_data)
>   File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 152, in __call__
>     return ic.complete()
>   File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 57, in complete
>     self.add_mutations(context, invocation_result)
>   File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations
>     for name, handle in context.states.items():
> AttributeError: 'NoneType' object has no attribute 'states'
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)