You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/15 21:43:00 UTC
[jira] [Commented] (BEAM-10384) Dataflow can't pickle
WeakRefDictionary
[ https://issues.apache.org/jira/browse/BEAM-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302029#comment-17302029 ]
Kenneth Knowles commented on BEAM-10384:
----------------------------------------
It is actually not safe to modify an element. The same element may be passed to many DoFns. If you mutate the element you may corrupt your output.
> Dataflow can't pickle WeakRefDictionary
> ---------------------------------------
>
> Key: BEAM-10384
> URL: https://issues.apache.org/jira/browse/BEAM-10384
> Project: Beam
> Issue Type: New Feature
> Components: runner-dataflow
> Affects Versions: 2.22.0
> Environment: Ubuntu LTS (direct runner)
> Reporter: Maurice Poirrier
> Priority: P2
> Labels: GCP
> Original Estimate: 120h
> Remaining Estimate: 120h
>
> we are trying to deploy an Streaming pipeline to Dataflow where we separate in few different "routes" that we manipulate differently the data.
> We did the complete development with the DirectRunner, and works smoothly as we tested but now, that we did deployed it to Dataflow, it does not work.
>
> {code:java}
> class SplitByRoute(beam.DoFn):
> OUTPUT_TAG_ROUTE_ONE= "route_one"
> OUTPUT_TAG_ROUTE_TWO = "route_two"
> OUTPUT_NOT_SUPPORTED = "not_supported"
> def _init_(self):
> beam.DoFn._init_(self)
> def process(self, elem):
> try:
> route = self.define_route(elem["param"]) # Just tag it depending on param
> except Exception:
> route = None
> logging.info(f"Routed to {route}")
> if route == self.OUTPUT_TAG_ROUTE_ONE:
> yield TaggedOutput(self.OUTPUT_TAG_ROUTE_ONE, elem)
> elif route == self.OUTPUT_TAG_ROUTE_TWO:
> logging.info(f"Element: {elem}")
> yield TaggedOutput(self.OUTPUT_TAG_ROUTE_TWO, elem)
> else:
> yield TaggedOutput(self.OUTPUT_NOT_SUPPORTED, elem)
> {code}
>
>
> The code fails when yielding on the following doFn
>
> It does log the element, yield the output and fails with the following error
> `AttributeError: Can't pickle local object 'WeakValueDictionary.__init__.<locals>.remove' [while running 'generatedPtransform-3196']`
> Other considerations are that we use taggedOutputs on the pipeline before this DoFn, and it works on Dataflow but this one in particularly fails with the error mentioned.
> Any suggestions so how we could manage this? It's been very frustrating error.
> Thank you!!! :)
>
>
> Edit:
> The problem is when we use DataflowRunner you can not modify an object that we were passing as message (element in the example) and then send it.
> For example
> {code:java}
> elem = {
> 'param': OurClass(),
> 'param2': 'stuf'
> }
> class OurClass:
> def __init__(self):
> self.something = None
> def dosomething(self):
> self.something = 1
> self.other = 2{code}
>
> So, on the code on the top we used, define_route to check which route the data will go. But on define_route we called dosomething method to define the route. So the instance of the class is modified and then, we it tried to pickle this message, the code fails.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)