You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 17:21:00 UTC

[GitHub] [beam] damccorm opened a new issue, #20434: Dataflow can't pickle WeakRefDictionary

damccorm opened a new issue, #20434:
URL: https://github.com/apache/beam/issues/20434

   we are trying to deploy an Streaming pipeline to Dataflow where we separate in few different "routes" that we manipulate differently the data.
    We did the complete development with the DirectRunner, and works smoothly as we tested but now, that we did deployed it to Dataflow, it does not work.
   
    
   ```
   
   class SplitByRoute(beam.DoFn):
       OUTPUT_TAG_ROUTE_ONE= "route_one"
       OUTPUT_TAG_ROUTE_TWO = "route_two"
   
      OUTPUT_NOT_SUPPORTED = "not_supported"
       def _init_(self):
         beam.DoFn._init_(self)
   
    
     def process(self, elem):
           try:
               route = self.define_route(elem["param"]) # Just
   tag it depending on param 
           except Exception: 
               route = None
           logging.info(f"Routed
   to {route}")
           if route == self.OUTPUT_TAG_ROUTE_ONE:
               yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE,
   elem)
           elif route == self.OUTPUT_TAG_ROUTE_TWO:
               logging.info(f"Element: {elem}")
   
              yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
           else:
               yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED,
   elem)
   
   ```
   
    
   
    
   
   The code fails when yielding on the following doFn
   
    
   
   It does log the element, yield the output and fails with the following error
    `AttributeError: Can't pickle local object 'WeakValueDictionary.__init__.<locals\>.remove' [while running 'generatedPtransform-3196']`
   
   Other considerations are that we use taggedOutputs on the pipeline before this DoFn, and it works on Dataflow but this one in particularly fails with the error mentioned.
   
   Any suggestions so how we could manage this? It's been very frustrating error.
   
   Thank you!!! :)
   
    
   
    
   
   Edit:
   
   The problem is when we use DataflowRunner you can not modify an object that we were passing as message (element in the example) and then send it. 
   
   For example
   ```
   
   elem = {
       'param': OurClass(),
       'param2': 'stuf'
   }
   
   class OurClass:
   
       def __init__(self):
   
          self.something = None
       def dosomething(self):
           self.something = 1
           self.other
   = 2
   ```
   
    
   
   So, on the code on the top we used, define_route to check which route the data will go. But on define_route we called dosomething method to define the route. So the instance of the class is modified and then, we it tried to pickle this message, the code fails.
   
   Imported from Jira [BEAM-10384](https://issues.apache.org/jira/browse/BEAM-10384). Original Jira may contain additional context.
   Reported by: mpoirrier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org