You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 17:21:00 UTC
[GitHub] [beam] damccorm opened a new issue, #20434: Dataflow can't pickle WeakRefDictionary
damccorm opened a new issue, #20434:
URL: https://github.com/apache/beam/issues/20434
we are trying to deploy an Streaming pipeline to Dataflow where we separate in few different "routes" that we manipulate differently the data.
We did the complete development with the DirectRunner, and works smoothly as we tested but now, that we did deployed it to Dataflow, it does not work.
```
class SplitByRoute(beam.DoFn):
OUTPUT_TAG_ROUTE_ONE= "route_one"
OUTPUT_TAG_ROUTE_TWO = "route_two"
OUTPUT_NOT_SUPPORTED = "not_supported"
def _init_(self):
beam.DoFn._init_(self)
def process(self, elem):
try:
route = self.define_route(elem["param"]) # Just
tag it depending on param
except Exception:
route = None
logging.info(f"Routed
to {route}")
if route == self.OUTPUT_TAG_ROUTE_ONE:
yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE,
elem)
elif route == self.OUTPUT_TAG_ROUTE_TWO:
logging.info(f"Element: {elem}")
yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
else:
yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED,
elem)
```
The code fails when yielding on the following doFn
It does log the element, yield the output and fails with the following error
`AttributeError: Can't pickle local object 'WeakValueDictionary.__init__.<locals\>.remove' [while running 'generatedPtransform-3196']`
Other considerations are that we use taggedOutputs on the pipeline before this DoFn, and it works on Dataflow but this one in particularly fails with the error mentioned.
Any suggestions so how we could manage this? It's been very frustrating error.
Thank you!!! :)
Edit:
The problem is when we use DataflowRunner you can not modify an object that we were passing as message (element in the example) and then send it.
For example
```
elem = {
'param': OurClass(),
'param2': 'stuf'
}
class OurClass:
def __init__(self):
self.something = None
def dosomething(self):
self.something = 1
self.other
= 2
```
So, on the code on the top we used, define_route to check which route the data will go. But on define_route we called dosomething method to define the route. So the instance of the class is modified and then, we it tried to pickle this message, the code fails.
Imported from Jira [BEAM-10384](https://issues.apache.org/jira/browse/BEAM-10384). Original Jira may contain additional context.
Reported by: mpoirrier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org