You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 22:52:20 UTC

[GitHub] [beam] kennknowles opened a new issue, #19170: Provide support for Gevent

kennknowles opened a new issue, #19170:
URL: https://github.com/apache/beam/issues/19170

   [Gevent](http://www.gevent.org/) is basically used to make parallel network calls. We are using gevent in one of the transformation methods to call internal services. Before using the gevent we also patch it as mentioned [here](https://github.com/grpc/grpc/issues/4629#issuecomment-376962677). 
   The transformation method is making multiple network call in parallel. Here is the code snippet:
   ```
   
   /__init__.py
      import gevent.monkey
      gevent.monkey.patch_all()
   
   /transform.py
   from gevent import
   Greenlet
   from gevent import joinall
   def filter_out_invalid_users(events):
      key, user_id_data_pairs
   = events
      user_ids = [user_id for user_id, data in user_id_data_pairs]
   
      jobs = []
      id_chunks
   = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
      for id_chunk in id_chunks:
       jobs.append(Greenlet.spawn(_call_users_service,
   # _call_user_service_ method is making the network call.
      list(id_chunk)))
   
      """
      Here we
   increase the timeout based on the number of greenlets we are running, to account for yielding
      among
   greenlets
      """
      join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
      joinall(jobs,
   timeout=join_timeout)
   
      successful_jobs = [job for job in jobs if job.successful()]
      valid_user_ids
   = []
      for job in successful_jobs:
         network_response = job.get()
         valid_user_ids.append(network_response.user_id)
   
        yield valid_user_ids
   
   def _call_users_service(user_ids):
       # make network call and return
   response
       ..
       ..
       return network_response
   
   ```
   
   This allows pipelines to start and partially run. However the tasks produce a stream of gevent exceptions and do not make any progress.:
   ```
   
    
   Exception greenlet.error: error('cannot switch to a different thread',) in 'grpc._cython.cygrpc.run_loop'
   ignored
   Traceback (most recent call last):
     File "src/gevent/event.py", line 240, in gevent._event.Event.wait
    
   File "src/gevent/event.py", line 140, in gevent._event._AbstractLinkable._wait
     File "src/gevent/event.py",
   line 117, in gevent._event._AbstractLinkable._wait_core
     File "src/gevent/event.py", line 119, in gevent._event._AbstractLinkable._wait_core
    
   File "src/gevent/_greenlet_primitives.py", line 59, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
    
   File "src/gevent/_greenlet_primitives.py", line 59, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
    
   File "src/gevent/_greenlet_primitives.py", line 63, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
    
   File "src/gevent/__greenlet_primitives.pxd", line 35, in gevent.__greenlet_primitives._greenlet_switch
   greenlet.error:
   cannot switch to a different thread
    
   ```
   
   The alternative approach is to use multiprocess module as shown [here](https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117) & [here](https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313).
   
   Gevent is lightweight and good for parallelizing IO/Network bound jobs and it can efficiently use resources and it can scale well in case heavy load.
    
   
   Imported from Jira [BEAM-5497](https://issues.apache.org/jira/browse/BEAM-5497). Original Jira may contain additional context.
   Reported by: rakeshkumar.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org