You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Frans King (Jira)" <ji...@apache.org> on 2020/10/06 15:18:00 UTC
[jira] [Commented] (FLINK-18518) Add Async RequestReply handler for
the Python SDK
[ https://issues.apache.org/jira/browse/FLINK-18518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208836#comment-17208836 ]
Frans King commented on FLINK-18518:
------------------------------------
I think there might be a concurrency bug with the implementation of AsyncRequestReplyHandler - https://issues.apache.org/jira/browse/FLINK-19515
ic.setup(). <- sets ic.context = some value
await ....
ic.complete() <- sets ic.context = None
As a result ic.context can be None depending on how the coros yield/awaken resulting in
line 57, in complete
self.add_mutations(context, invocation_result)
File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations
for name, handle in context.states.items():
AttributeError: 'NoneType' object has no attribute 'states'
> Add Async RequestReply handler for the Python SDK
> -------------------------------------------------
>
> Key: FLINK-18518
> URL: https://issues.apache.org/jira/browse/FLINK-18518
> Project: Flink
> Issue Type: New Feature
> Components: Stateful Functions
> Affects Versions: statefun-2.1.0
> Reporter: Igal Shilman
> Assignee: Igal Shilman
> Priority: Major
> Labels: beginner-friendly, pull-request-available
> Fix For: statefun-2.2.0
>
>
> I/O bound stateful functions can benefit from the built-in async/io support in Python, but the
> RequestReply handler is not an async-io compatible. See [this|https://stackoverflow.com/questions/62640283/flink-stateful-functions-async-calls-with-the-python-sdk] question on stackoverflow.
>
> Having an asyncio compatible handler will open the door to the usage of aiohttp for example:
>
> {code:java}
> import aiohttp
> import asyncio
> ...
> async def fetch(session, url):
> async with session.get(url) as response:
> return await response.text()
> @function.bind("example/hello")
> async def hello(context, message):
> async with aiohttp.ClientSession() as session:
> html = await fetch(session, 'http://python.org')
> context.pack_and_reply(SomeProtobufMessage(html))
> from aiohttp import webhandler
> handler = AsyncRequestReplyHandler(functions)
> async def handle(request):
> req = await request.read()
> res = await handler(req)
> return web.Response(body=res, content_type="application/octet-stream'")
> app = web.Application()
> app.add_routes([web.post('/statefun', handle)])
> if __name__ == '__main__':
> web.run_app(app, port=5000)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)