You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Micah Wylde <mw...@lyft.com> on 2018/09/18 00:01:53 UTC

Beam python sdk with gevent

Hi all,

We're using the Python SDK with the portable Flink runner and running into
some problems integrating gevent. We're patching the gRPC runtime for
gevent as described in [0] which allows pipelines to start and partially
run. However the tasks produce a stream of gevent exceptions:

Exception greenlet.error: error('cannot switch to a different thread',) in
'grpc._cython.cygrpc.run_loop' ignored
Traceback (most recent call last):
  File "src/gevent/event.py", line 240, in gevent._event.Event.wait
  File "src/gevent/event.py", line 140, in gevent._event._
AbstractLinkable._wait
  File "src/gevent/event.py", line 117, in gevent._event._
AbstractLinkable._wait_core
  File "src/gevent/event.py", line 119, in gevent._event._
AbstractLinkable._wait_core
  File "src/gevent/_greenlet_primitives.py", line 59, in
gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 59, in
gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 63, in
gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/__greenlet_primitives.pxd", line 35, in
gevent.__greenlet_primitives._greenlet_switch
greenlet.error: cannot switch to a different thread

and do not make any progress.

Has anybody else successfully used the portable python sdk with gevent? Or
is there a recommended alternative for doing async IO in python pipelines?

[0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677

Micah

Re: Beam python sdk with gevent

Posted by Ahmet Altay <al...@google.com>.
Thank you Rakesh! Let us know how it goes if you get a chance to try
multiprocess.

On Tue, Sep 25, 2018 at 9:19 AM, Rakesh Kumar <ra...@lyft.com> wrote:

> Hi Ahmet,
>
> I filed the jira ticket https://issues.apache.org/jira/browse/BEAM-5497.
> Let me know if you need anything else from us.
>
> Thank you,
> Rakesh
>
> On Mon, Sep 24, 2018 at 5:08 PM Ahmet Altay <al...@google.com> wrote:
>
>> On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <ra...@lyft.com>
>> wrote:
>>
>>> Thanks Ahmet for providing a reference code. I will give it a try.
>>>
>>> I also tried to read the code it feels like you are using the
>>> multiprocess for parallelizing runtime jobs. We wanted to use Gevent
>>> because it is lightweight and good for parallelizing IO/Network bound jobs.
>>>
>>
>> We are using this code for IO bound operation. For example [1], here it
>> is used to make calls into GCS in parallel with batches of files.
>>
>> [1] https://github.com/apache/beam/blob/7bd73a51b670755bbb19e129100372
>> 2d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313
>>
>>
>>> I would also recommend providing Gevent support in the future because it
>>> can efficiently use resources and it can scale well in heavy load.
>>>
>>
>> Do you mind filing a JIRA for the gevent issue so that we can keep track
>> of it?
>>
>>
>>>
>>>
>>>
>>> On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <al...@google.com> wrote:
>>>
>>>> Thank you for the example it helps.
>>>>
>>>> I still do not know what is wrong with gevent. Would you consider using
>>>> multiprocessing package? We are already using that to accomplish something
>>>> similar in file based sinks, and there is already utility function that
>>>> wraps it around similar to your example [1].
>>>>
>>>> [1] https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591
>>>> f860235708/sdks/python/apache_beam/internal/util.py#L117
>>>>
>>>> Ahmet
>>>>
>>>>
>>>> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <ra...@lyft.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Gevent <http://www.gevent.org/> is basically used to make parallel
>>>>> network calls. We are using gevent in one of the transformation methods to
>>>>> call internal services. The transformation method is making multiple
>>>>> network call in parallel. Here is the code snippet:
>>>>> /__init__.py
>>>>> import gevent.monkey
>>>>> gevent.monkey.patch_all()
>>>>>
>>>>> /transform.py
>>>>> from gevent import Greenlet
>>>>> from gevent import joinall
>>>>> def filter_out_invalid_users(events):
>>>>>    key, user_id_data_pairs = events
>>>>>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>>>>>
>>>>>    jobs = []
>>>>>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>>>>>    for id_chunk in id_chunks:
>>>>>       jobs.append(Greenlet.spawn(_call_users_service, #
>>>>> _call_user_service_ method is making the network call.
>>>>>                                  list(id_chunk)))
>>>>>
>>>>>    """
>>>>>    Here we increase the timeout based on the number of greenlets we
>>>>> are running, to account for yielding
>>>>>    among greenlets
>>>>>    """
>>>>>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT *
>>>>> 0.1
>>>>>    joinall(jobs, timeout=join_timeout)
>>>>>
>>>>>    successful_jobs = [job for job in jobs if job.successful()]
>>>>>    valid_user_ids = []
>>>>>    for job in successful_jobs:
>>>>>       network_response = job.get()
>>>>>       valid_user_ids.append(network_response.user_id)
>>>>>    yield valid_user_ids
>>>>>
>>>>> def _call_users_service(user_ids):
>>>>>    # make network call and return response
>>>>>    ..
>>>>>    ..
>>>>>    return network_response
>>>>>
>>>>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote:
>>>>>
>>>>>> I am also not familiar with gevent. Could you explain what are you
>>>>>> trying to do and how do you plan to use gevent?
>>>>>>
>>>>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I don't think anyone has tried what your doing. The code that your
>>>>>>> working with is very new.
>>>>>>>
>>>>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mw...@lyft.com> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> We're using the Python SDK with the portable Flink runner and
>>>>>>>> running into some problems integrating gevent. We're patching the gRPC
>>>>>>>> runtime for gevent as described in [0] which allows pipelines to start and
>>>>>>>> partially run. However the tasks produce a stream of gevent exceptions:
>>>>>>>>
>>>>>>>> Exception greenlet.error: error('cannot switch to a different
>>>>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>>>>>   File "src/gevent/event.py", line 140, in gevent._event._
>>>>>>>> AbstractLinkable._wait
>>>>>>>>   File "src/gevent/event.py", line 117, in gevent._event._
>>>>>>>> AbstractLinkable._wait_core
>>>>>>>>   File "src/gevent/event.py", line 119, in gevent._event._
>>>>>>>> AbstractLinkable._wait_core
>>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>>>>>> gevent.__greenlet_primitives._greenlet_switch
>>>>>>>> greenlet.error: cannot switch to a different thread
>>>>>>>>
>>>>>>>> and do not make any progress.
>>>>>>>>
>>>>>>>> Has anybody else successfully used the portable python sdk with
>>>>>>>> gevent? Or is there a recommended alternative for doing async IO in python
>>>>>>>> pipelines?
>>>>>>>>
>>>>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>>>>>
>>>>>>>> Micah
>>>>>>>>
>>>>>>>
>>>>>> --
>>>>> Rakesh Kumar
>>>>> Software Engineer
>>>>> 510-761-1364 <(510)%20761-1364> |
>>>>>
>>>>> <https://www.lyft.com/>
>>>>>
>>>>
>>>> --
>>> Rakesh Kumar
>>> Software Engineer
>>> 510-761-1364 <(510)%20761-1364> |
>>>
>>> <https://www.lyft.com/>
>>>
>> --
> Rakesh Kumar
> Software Engineer
> 510-761-1364 |
>
> <https://www.lyft.com/>
>

Re: Beam python sdk with gevent

Posted by Rakesh Kumar <ra...@lyft.com>.
Hi Ahmet,

I filed the jira ticket https://issues.apache.org/jira/browse/BEAM-5497.
Let me know if you need anything else from us.

Thank you,
Rakesh

On Mon, Sep 24, 2018 at 5:08 PM Ahmet Altay <al...@google.com> wrote:

> On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <ra...@lyft.com>
> wrote:
>
>> Thanks Ahmet for providing a reference code. I will give it a try.
>>
>> I also tried to read the code it feels like you are using the
>> multiprocess for parallelizing runtime jobs. We wanted to use Gevent
>> because it is lightweight and good for parallelizing IO/Network bound jobs.
>>
>
> We are using this code for IO bound operation. For example [1], here it is
> used to make calls into GCS in parallel with batches of files.
>
> [1]
> https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313
>
>
>> I would also recommend providing Gevent support in the future because it
>> can efficiently use resources and it can scale well in heavy load.
>>
>
> Do you mind filing a JIRA for the gevent issue so that we can keep track
> of it?
>
>
>>
>>
>>
>> On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <al...@google.com> wrote:
>>
>>> Thank you for the example it helps.
>>>
>>> I still do not know what is wrong with gevent. Would you consider using
>>> multiprocessing package? We are already using that to accomplish something
>>> similar in file based sinks, and there is already utility function that
>>> wraps it around similar to your example [1].
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117
>>>
>>> Ahmet
>>>
>>>
>>> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <ra...@lyft.com>
>>> wrote:
>>>
>>>>
>>>> Gevent <http://www.gevent.org/> is basically used to make parallel
>>>> network calls. We are using gevent in one of the transformation methods to
>>>> call internal services. The transformation method is making multiple
>>>> network call in parallel. Here is the code snippet:
>>>> /__init__.py
>>>> import gevent.monkey
>>>> gevent.monkey.patch_all()
>>>>
>>>> /transform.py
>>>> from gevent import Greenlet
>>>> from gevent import joinall
>>>> def filter_out_invalid_users(events):
>>>>    key, user_id_data_pairs = events
>>>>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>>>>
>>>>    jobs = []
>>>>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>>>>    for id_chunk in id_chunks:
>>>>       jobs.append(Greenlet.spawn(_call_users_service, #
>>>> _call_user_service_ method is making the network call.
>>>>                                  list(id_chunk)))
>>>>
>>>>    """
>>>>    Here we increase the timeout based on the number of greenlets we
>>>> are running, to account for yielding
>>>>    among greenlets
>>>>    """
>>>>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
>>>>    joinall(jobs, timeout=join_timeout)
>>>>
>>>>    successful_jobs = [job for job in jobs if job.successful()]
>>>>    valid_user_ids = []
>>>>    for job in successful_jobs:
>>>>       network_response = job.get()
>>>>       valid_user_ids.append(network_response.user_id)
>>>>    yield valid_user_ids
>>>>
>>>> def _call_users_service(user_ids):
>>>>    # make network call and return response
>>>>    ..
>>>>    ..
>>>>    return network_response
>>>>
>>>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote:
>>>>
>>>>> I am also not familiar with gevent. Could you explain what are you
>>>>> trying to do and how do you plan to use gevent?
>>>>>
>>>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I don't think anyone has tried what your doing. The code that your
>>>>>> working with is very new.
>>>>>>
>>>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mw...@lyft.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> We're using the Python SDK with the portable Flink runner and
>>>>>>> running into some problems integrating gevent. We're patching the gRPC
>>>>>>> runtime for gevent as described in [0] which allows pipelines to start and
>>>>>>> partially run. However the tasks produce a stream of gevent exceptions:
>>>>>>>
>>>>>>> Exception greenlet.error: error('cannot switch to a different
>>>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored
>>>>>>> Traceback (most recent call last):
>>>>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>>>>   File "src/gevent/event.py", line 140, in
>>>>>>> gevent._event._AbstractLinkable._wait
>>>>>>>   File "src/gevent/event.py", line 117, in
>>>>>>> gevent._event._AbstractLinkable._wait_core
>>>>>>>   File "src/gevent/event.py", line 119, in
>>>>>>> gevent._event._AbstractLinkable._wait_core
>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>>>>> gevent.__greenlet_primitives._greenlet_switch
>>>>>>> greenlet.error: cannot switch to a different thread
>>>>>>>
>>>>>>> and do not make any progress.
>>>>>>>
>>>>>>> Has anybody else successfully used the portable python sdk with
>>>>>>> gevent? Or is there a recommended alternative for doing async IO in python
>>>>>>> pipelines?
>>>>>>>
>>>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>>>>
>>>>>>> Micah
>>>>>>>
>>>>>>
>>>>> --
>>>> Rakesh Kumar
>>>> Software Engineer
>>>> 510-761-1364 <(510)%20761-1364> |
>>>>
>>>> <https://www.lyft.com/>
>>>>
>>>
>>> --
>> Rakesh Kumar
>> Software Engineer
>> 510-761-1364 <(510)%20761-1364> |
>>
>> <https://www.lyft.com/>
>>
> --
Rakesh Kumar
Software Engineer
510-761-1364 |

<https://www.lyft.com/>

Re: Beam python sdk with gevent

Posted by Ahmet Altay <al...@google.com>.
On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <ra...@lyft.com> wrote:

> Thanks Ahmet for providing a reference code. I will give it a try.
>
> I also tried to read the code it feels like you are using the multiprocess
> for parallelizing runtime jobs. We wanted to use Gevent because it is
> lightweight and good for parallelizing IO/Network bound jobs.
>

We are using this code for IO bound operation. For example [1], here it is
used to make calls into GCS in parallel with batches of files.

[1]
https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313


> I would also recommend providing Gevent support in the future because it
> can efficiently use resources and it can scale well in heavy load.
>

Do you mind filing a JIRA for the gevent issue so that we can keep track of
it?


>
>
>
> On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <al...@google.com> wrote:
>
>> Thank you for the example it helps.
>>
>> I still do not know what is wrong with gevent. Would you consider using
>> multiprocessing package? We are already using that to accomplish something
>> similar in file based sinks, and there is already utility function that
>> wraps it around similar to your example [1].
>>
>> [1] https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591
>> f860235708/sdks/python/apache_beam/internal/util.py#L117
>>
>> Ahmet
>>
>>
>> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <ra...@lyft.com>
>> wrote:
>>
>>>
>>> Gevent <http://www.gevent.org/> is basically used to make parallel
>>> network calls. We are using gevent in one of the transformation methods to
>>> call internal services. The transformation method is making multiple
>>> network call in parallel. Here is the code snippet:
>>> /__init__.py
>>> import gevent.monkey
>>> gevent.monkey.patch_all()
>>>
>>> /transform.py
>>> from gevent import Greenlet
>>> from gevent import joinall
>>> def filter_out_invalid_users(events):
>>>    key, user_id_data_pairs = events
>>>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>>>
>>>    jobs = []
>>>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>>>    for id_chunk in id_chunks:
>>>       jobs.append(Greenlet.spawn(_call_users_service, #
>>> _call_user_service_ method is making the network call.
>>>                                  list(id_chunk)))
>>>
>>>    """
>>>    Here we increase the timeout based on the number of greenlets we are
>>> running, to account for yielding
>>>    among greenlets
>>>    """
>>>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
>>>    joinall(jobs, timeout=join_timeout)
>>>
>>>    successful_jobs = [job for job in jobs if job.successful()]
>>>    valid_user_ids = []
>>>    for job in successful_jobs:
>>>       network_response = job.get()
>>>       valid_user_ids.append(network_response.user_id)
>>>    yield valid_user_ids
>>>
>>> def _call_users_service(user_ids):
>>>    # make network call and return response
>>>    ..
>>>    ..
>>>    return network_response
>>>
>>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote:
>>>
>>>> I am also not familiar with gevent. Could you explain what are you
>>>> trying to do and how do you plan to use gevent?
>>>>
>>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> I don't think anyone has tried what your doing. The code that your
>>>>> working with is very new.
>>>>>
>>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mw...@lyft.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> We're using the Python SDK with the portable Flink runner and running
>>>>>> into some problems integrating gevent. We're patching the gRPC runtime for
>>>>>> gevent as described in [0] which allows pipelines to start and partially
>>>>>> run. However the tasks produce a stream of gevent exceptions:
>>>>>>
>>>>>> Exception greenlet.error: error('cannot switch to a different
>>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored
>>>>>> Traceback (most recent call last):
>>>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>>>   File "src/gevent/event.py", line 140, in gevent._event._
>>>>>> AbstractLinkable._wait
>>>>>>   File "src/gevent/event.py", line 117, in gevent._event._
>>>>>> AbstractLinkable._wait_core
>>>>>>   File "src/gevent/event.py", line 119, in gevent._event._
>>>>>> AbstractLinkable._wait_core
>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>>>> gevent.__greenlet_primitives._greenlet_switch
>>>>>> greenlet.error: cannot switch to a different thread
>>>>>>
>>>>>> and do not make any progress.
>>>>>>
>>>>>> Has anybody else successfully used the portable python sdk with
>>>>>> gevent? Or is there a recommended alternative for doing async IO in python
>>>>>> pipelines?
>>>>>>
>>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>>>
>>>>>> Micah
>>>>>>
>>>>>
>>>> --
>>> Rakesh Kumar
>>> Software Engineer
>>> 510-761-1364 <(510)%20761-1364> |
>>>
>>> <https://www.lyft.com/>
>>>
>>
>> --
> Rakesh Kumar
> Software Engineer
> 510-761-1364 |
>
> <https://www.lyft.com/>
>

Re: Beam python sdk with gevent

Posted by Rakesh Kumar <ra...@lyft.com>.
Thanks Ahmet for providing a reference code. I will give it a try.

I also tried to read the code it feels like you are using the multiprocess
for parallelizing runtime jobs. We wanted to use Gevent because it is
lightweight and good for parallelizing IO/Network bound jobs. I would also
recommend providing Gevent support in the future because it can efficiently
use resources and it can scale well in heavy load.



On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <al...@google.com> wrote:

> Thank you for the example it helps.
>
> I still do not know what is wrong with gevent. Would you consider using
> multiprocessing package? We are already using that to accomplish something
> similar in file based sinks, and there is already utility function that
> wraps it around similar to your example [1].
>
> [1]
> https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117
>
> Ahmet
>
>
> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <ra...@lyft.com>
> wrote:
>
>>
>> Gevent <http://www.gevent.org/> is basically used to make parallel
>> network calls. We are using gevent in one of the transformation methods to
>> call internal services. The transformation method is making multiple
>> network call in parallel. Here is the code snippet:
>> /__init__.py
>> import gevent.monkey
>> gevent.monkey.patch_all()
>>
>> /transform.py
>> from gevent import Greenlet
>> from gevent import joinall
>> def filter_out_invalid_users(events):
>>    key, user_id_data_pairs = events
>>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>>
>>    jobs = []
>>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>>    for id_chunk in id_chunks:
>>       jobs.append(Greenlet.spawn(_call_users_service, #
>> _call_user_service_ method is making the network call.
>>                                  list(id_chunk)))
>>
>>    """
>>    Here we increase the timeout based on the number of greenlets we are
>> running, to account for yielding
>>    among greenlets
>>    """
>>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
>>    joinall(jobs, timeout=join_timeout)
>>
>>    successful_jobs = [job for job in jobs if job.successful()]
>>    valid_user_ids = []
>>    for job in successful_jobs:
>>       network_response = job.get()
>>       valid_user_ids.append(network_response.user_id)
>>    yield valid_user_ids
>>
>> def _call_users_service(user_ids):
>>    # make network call and return response
>>    ..
>>    ..
>>    return network_response
>>
>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote:
>>
>>> I am also not familiar with gevent. Could you explain what are you
>>> trying to do and how do you plan to use gevent?
>>>
>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> I don't think anyone has tried what your doing. The code that your
>>>> working with is very new.
>>>>
>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mw...@lyft.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> We're using the Python SDK with the portable Flink runner and running
>>>>> into some problems integrating gevent. We're patching the gRPC runtime for
>>>>> gevent as described in [0] which allows pipelines to start and partially
>>>>> run. However the tasks produce a stream of gevent exceptions:
>>>>>
>>>>> Exception greenlet.error: error('cannot switch to a different
>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored
>>>>> Traceback (most recent call last):
>>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>>   File "src/gevent/event.py", line 140, in
>>>>> gevent._event._AbstractLinkable._wait
>>>>>   File "src/gevent/event.py", line 117, in
>>>>> gevent._event._AbstractLinkable._wait_core
>>>>>   File "src/gevent/event.py", line 119, in
>>>>> gevent._event._AbstractLinkable._wait_core
>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>>> gevent.__greenlet_primitives._greenlet_switch
>>>>> greenlet.error: cannot switch to a different thread
>>>>>
>>>>> and do not make any progress.
>>>>>
>>>>> Has anybody else successfully used the portable python sdk with
>>>>> gevent? Or is there a recommended alternative for doing async IO in python
>>>>> pipelines?
>>>>>
>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>>
>>>>> Micah
>>>>>
>>>>
>>> --
>> Rakesh Kumar
>> Software Engineer
>> 510-761-1364 <(510)%20761-1364> |
>>
>> <https://www.lyft.com/>
>>
>
> --
Rakesh Kumar
Software Engineer
510-761-1364 |

<https://www.lyft.com/>

Re: Beam python sdk with gevent

Posted by Ahmet Altay <al...@google.com>.
Thank you for the example it helps.

I still do not know what is wrong with gevent. Would you consider using
multiprocessing package? We are already using that to accomplish something
similar in file based sinks, and there is already utility function that
wraps it around similar to your example [1].

[1]
https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117

Ahmet


On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <ra...@lyft.com> wrote:

>
> Gevent <http://www.gevent.org/> is basically used to make parallel
> network calls. We are using gevent in one of the transformation methods to
> call internal services. The transformation method is making multiple
> network call in parallel. Here is the code snippet:
> /__init__.py
> import gevent.monkey
> gevent.monkey.patch_all()
>
> /transform.py
> from gevent import Greenlet
> from gevent import joinall
> def filter_out_invalid_users(events):
>    key, user_id_data_pairs = events
>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>
>    jobs = []
>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>    for id_chunk in id_chunks:
>       jobs.append(Greenlet.spawn(_call_users_service, #
> _call_user_service_ method is making the network call.
>                                  list(id_chunk)))
>
>    """
>    Here we increase the timeout based on the number of greenlets we are
> running, to account for yielding
>    among greenlets
>    """
>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
>    joinall(jobs, timeout=join_timeout)
>
>    successful_jobs = [job for job in jobs if job.successful()]
>    valid_user_ids = []
>    for job in successful_jobs:
>       network_response = job.get()
>       valid_user_ids.append(network_response.user_id)
>    yield valid_user_ids
>
> def _call_users_service(user_ids):
>    # make network call and return response
>    ..
>    ..
>    return network_response
>
> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote:
>
>> I am also not familiar with gevent. Could you explain what are you trying
>> to do and how do you plan to use gevent?
>>
>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I don't think anyone has tried what your doing. The code that your
>>> working with is very new.
>>>
>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mw...@lyft.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> We're using the Python SDK with the portable Flink runner and running
>>>> into some problems integrating gevent. We're patching the gRPC runtime for
>>>> gevent as described in [0] which allows pipelines to start and partially
>>>> run. However the tasks produce a stream of gevent exceptions:
>>>>
>>>> Exception greenlet.error: error('cannot switch to a different thread',)
>>>> in 'grpc._cython.cygrpc.run_loop' ignored
>>>> Traceback (most recent call last):
>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>   File "src/gevent/event.py", line 140, in gevent._event._
>>>> AbstractLinkable._wait
>>>>   File "src/gevent/event.py", line 117, in gevent._event._
>>>> AbstractLinkable._wait_core
>>>>   File "src/gevent/event.py", line 119, in gevent._event._
>>>> AbstractLinkable._wait_core
>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>> gevent.__greenlet_primitives._greenlet_switch
>>>> greenlet.error: cannot switch to a different thread
>>>>
>>>> and do not make any progress.
>>>>
>>>> Has anybody else successfully used the portable python sdk with gevent?
>>>> Or is there a recommended alternative for doing async IO in python
>>>> pipelines?
>>>>
>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>
>>>> Micah
>>>>
>>>
>> --
> Rakesh Kumar
> Software Engineer
> 510-761-1364 |
>
> <https://www.lyft.com/>
>

Re: Beam python sdk with gevent

Posted by Rakesh Kumar <ra...@lyft.com>.
Gevent <http://www.gevent.org/> is basically used to make parallel network
calls. We are using gevent in one of the transformation methods to call
internal services. The transformation method is making multiple network
call in parallel. Here is the code snippet:
/__init__.py
import gevent.monkey
gevent.monkey.patch_all()

/transform.py
from gevent import Greenlet
from gevent import joinall
def filter_out_invalid_users(events):
   key, user_id_data_pairs = events
   user_ids = [user_id for user_id, data in user_id_data_pairs]

   jobs = []
   id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
   for id_chunk in id_chunks:
      jobs.append(Greenlet.spawn(_call_users_service, # _call_user_service_
method is making the network call.
                                 list(id_chunk)))

   """
   Here we increase the timeout based on the number of greenlets we are
running, to account for yielding
   among greenlets
   """
   join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
   joinall(jobs, timeout=join_timeout)

   successful_jobs = [job for job in jobs if job.successful()]
   valid_user_ids = []
   for job in successful_jobs:
      network_response = job.get()
      valid_user_ids.append(network_response.user_id)
   yield valid_user_ids

def _call_users_service(user_ids):
   # make network call and return response
   ..
   ..
   return network_response

On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <al...@google.com> wrote:

> I am also not familiar with gevent. Could you explain what are you trying
> to do and how do you plan to use gevent?
>
> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote:
>
>> I don't think anyone has tried what your doing. The code that your
>> working with is very new.
>>
>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mw...@lyft.com> wrote:
>>
>>> Hi all,
>>>
>>> We're using the Python SDK with the portable Flink runner and running
>>> into some problems integrating gevent. We're patching the gRPC runtime for
>>> gevent as described in [0] which allows pipelines to start and partially
>>> run. However the tasks produce a stream of gevent exceptions:
>>>
>>> Exception greenlet.error: error('cannot switch to a different thread',)
>>> in 'grpc._cython.cygrpc.run_loop' ignored
>>> Traceback (most recent call last):
>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>   File "src/gevent/event.py", line 140, in
>>> gevent._event._AbstractLinkable._wait
>>>   File "src/gevent/event.py", line 117, in
>>> gevent._event._AbstractLinkable._wait_core
>>>   File "src/gevent/event.py", line 119, in
>>> gevent._event._AbstractLinkable._wait_core
>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>> gevent.__greenlet_primitives._greenlet_switch
>>> greenlet.error: cannot switch to a different thread
>>>
>>> and do not make any progress.
>>>
>>> Has anybody else successfully used the portable python sdk with gevent?
>>> Or is there a recommended alternative for doing async IO in python
>>> pipelines?
>>>
>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>
>>> Micah
>>>
>>
> --
Rakesh Kumar
Software Engineer
510-761-1364 |

<https://www.lyft.com/>

Re: Beam python sdk with gevent

Posted by Ahmet Altay <al...@google.com>.
I am also not familiar with gevent. Could you explain what are you trying
to do and how do you plan to use gevent?

On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <lc...@google.com> wrote:

> I don't think anyone has tried what your doing. The code that your working
> with is very new.
>
> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mw...@lyft.com> wrote:
>
>> Hi all,
>>
>> We're using the Python SDK with the portable Flink runner and running
>> into some problems integrating gevent. We're patching the gRPC runtime for
>> gevent as described in [0] which allows pipelines to start and partially
>> run. However the tasks produce a stream of gevent exceptions:
>>
>> Exception greenlet.error: error('cannot switch to a different thread',)
>> in 'grpc._cython.cygrpc.run_loop' ignored
>> Traceback (most recent call last):
>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>   File "src/gevent/event.py", line 140, in gevent._event._
>> AbstractLinkable._wait
>>   File "src/gevent/event.py", line 117, in gevent._event._
>> AbstractLinkable._wait_core
>>   File "src/gevent/event.py", line 119, in gevent._event._
>> AbstractLinkable._wait_core
>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>> gevent.__greenlet_primitives._greenlet_switch
>> greenlet.error: cannot switch to a different thread
>>
>> and do not make any progress.
>>
>> Has anybody else successfully used the portable python sdk with gevent?
>> Or is there a recommended alternative for doing async IO in python
>> pipelines?
>>
>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>
>> Micah
>>
>

Re: Beam python sdk with gevent

Posted by Lukasz Cwik <lc...@google.com>.
I don't think anyone has tried what your doing. The code that your working
with is very new.

On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <mw...@lyft.com> wrote:

> Hi all,
>
> We're using the Python SDK with the portable Flink runner and running into
> some problems integrating gevent. We're patching the gRPC runtime for
> gevent as described in [0] which allows pipelines to start and partially
> run. However the tasks produce a stream of gevent exceptions:
>
> Exception greenlet.error: error('cannot switch to a different thread',) in
> 'grpc._cython.cygrpc.run_loop' ignored
> Traceback (most recent call last):
>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>   File "src/gevent/event.py", line 140, in
> gevent._event._AbstractLinkable._wait
>   File "src/gevent/event.py", line 117, in
> gevent._event._AbstractLinkable._wait_core
>   File "src/gevent/event.py", line 119, in
> gevent._event._AbstractLinkable._wait_core
>   File "src/gevent/_greenlet_primitives.py", line 59, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/_greenlet_primitives.py", line 59, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/_greenlet_primitives.py", line 63, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
> gevent.__greenlet_primitives._greenlet_switch
> greenlet.error: cannot switch to a different thread
>
> and do not make any progress.
>
> Has anybody else successfully used the portable python sdk with gevent? Or
> is there a recommended alternative for doing async IO in python pipelines?
>
> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>
> Micah
>