You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/15 21:42:00 UTC

[jira] [Updated] (BEAM-10384) Dataflow can't pickle WeakRefDictionary

     [ https://issues.apache.org/jira/browse/BEAM-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-10384:
-----------------------------------
    Priority: P2  (was: P0)

> Dataflow can't pickle WeakRefDictionary
> ---------------------------------------
>
>                 Key: BEAM-10384
>                 URL: https://issues.apache.org/jira/browse/BEAM-10384
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.22.0
>         Environment: Ubuntu LTS (direct runner)
>            Reporter: Maurice Poirrier
>            Priority: P2
>              Labels: GCP
>             Fix For: Not applicable
>
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> we are trying to deploy an Streaming pipeline to Dataflow where we separate in few different "routes" that we manipulate differently the data.
>  We did the complete development with the DirectRunner, and works smoothly as we tested but now, that we did deployed it to Dataflow, it does not work.
>  
> {code:java}
> class SplitByRoute(beam.DoFn):
>     OUTPUT_TAG_ROUTE_ONE= "route_one"
>     OUTPUT_TAG_ROUTE_TWO = "route_two"
>     OUTPUT_NOT_SUPPORTED = "not_supported"
>     def _init_(self):
>       beam.DoFn._init_(self)
>     def process(self, elem):
>         try:
>             route = self.define_route(elem["param"]) # Just tag it depending on param 
>         except Exception: 
>             route = None
>         logging.info(f"Routed to {route}")
>         if route == self.OUTPUT_TAG_ROUTE_ONE:
>             yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
>         elif route == self.OUTPUT_TAG_ROUTE_TWO:
>             logging.info(f"Element: {elem}")
>             yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
>         else:
>             yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
> {code}
>  
>  
> The code fails when yielding on the following doFn
>  
> It does log the element, yield the output and fails with the following error
>  `AttributeError: Can't pickle local object 'WeakValueDictionary.__init__.<locals>.remove' [while running 'generatedPtransform-3196']`
> Other considerations are that we use taggedOutputs on the pipeline before this DoFn, and it works on Dataflow but this one in particularly fails with the error mentioned.
> Any suggestions so how we could manage this? It's been very frustrating error.
> Thank you!!! :)
>  
>  
> Edit:
> The problem is when we use DataflowRunner you can not modify an object that we were passing as message (element in the example) and then send it. 
> For example
> {code:java}
> elem = {
>     'param': OurClass(),
>     'param2': 'stuf'
> }
> class OurClass:
>     def __init__(self):
>         self.something = None
>     def dosomething(self):
>         self.something = 1
>         self.other = 2{code}
>  
> So, on the code on the top we used, define_route to check which route the data will go. But on define_route we called dosomething method to define the route. So the instance of the class is modified and then, we it tried to pickle this message, the code fails.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)