You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Frans King (Jira)" <ji...@apache.org> on 2020/10/06 15:18:00 UTC

[jira] [Commented] (FLINK-18518) Add Async RequestReply handler for the Python SDK

    [ https://issues.apache.org/jira/browse/FLINK-18518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208836#comment-17208836 ] 

Frans King commented on FLINK-18518:
------------------------------------

I think there might be a concurrency bug with the implementation of AsyncRequestReplyHandler - https://issues.apache.org/jira/browse/FLINK-19515

 

ic.setup(). <- sets ic.context = some value

await ....

ic.complete() <- sets ic.context = None

 

As a result ic.context can be None depending on how the coros yield/awaken resulting in

 

line 57, in complete

    self.add_mutations(context, invocation_result)

  File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations

    for name, handle in context.states.items():

AttributeError: 'NoneType' object has no attribute 'states'

 

 

> Add Async RequestReply handler for the Python SDK
> -------------------------------------------------
>
>                 Key: FLINK-18518
>                 URL: https://issues.apache.org/jira/browse/FLINK-18518
>             Project: Flink
>          Issue Type: New Feature
>          Components: Stateful Functions
>    Affects Versions: statefun-2.1.0
>            Reporter: Igal Shilman
>            Assignee: Igal Shilman
>            Priority: Major
>              Labels: beginner-friendly, pull-request-available
>             Fix For: statefun-2.2.0
>
>
> I/O bound stateful functions can benefit from the built-in async/io support in Python, but the 
> RequestReply handler is not an async-io compatible.  See [this|https://stackoverflow.com/questions/62640283/flink-stateful-functions-async-calls-with-the-python-sdk] question on stackoverflow.
>  
> Having an asyncio compatible handler will open the door to the usage of aiohttp for example:
>  
> {code:java}
> import aiohttp
> import asyncio
> ...
> async def fetch(session, url):
>     async with session.get(url) as response:
>         return await response.text()
> @function.bind("example/hello")
> async def hello(context, message):
>     async with aiohttp.ClientSession() as session:
>         html = await fetch(session, 'http://python.org')
>         context.pack_and_reply(SomeProtobufMessage(html))
> from aiohttp import webhandler 
> handler = AsyncRequestReplyHandler(functions)
> async def handle(request):
>     req = await request.read()
>     res = await handler(req)
>     return web.Response(body=res, content_type="application/octet-stream'")
> app = web.Application()
> app.add_routes([web.post('/statefun', handle)])
> if __name__ == '__main__':
>     web.run_app(app, port=5000)
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)