You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Brian Hulette (Jira)" <ji...@apache.org> on 2021/09/27 18:32:00 UTC

[jira] [Commented] (BEAM-12959) Dataflow error in CombinePerKey operation

    [ https://issues.apache.org/jira/browse/BEAM-12959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420964#comment-17420964 ] 

Brian Hulette commented on BEAM-12959:
--------------------------------------

This looks like an object that was improperly pickled. It was pickled with a reference to {{code}} and 16 arguments, but that method only takes 15 arguments.

Do you know what {{code()}} is referring to (e.g. is it from one of your types or a Beam type)? It would be helpful if you could share the CombineFn you're using.

> Dataflow error in CombinePerKey operation
> -----------------------------------------
>
>                 Key: BEAM-12959
>                 URL: https://issues.apache.org/jira/browse/BEAM-12959
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.30.0, 2.31.0, 2.32.0
>            Reporter: Eddie Wang
>            Priority: P0
>
> This occurs in Dataflow when trying to deploy a workflow from Pubsub -> SlidingWindows -> beam.ParDo(KeyValues()) -> beam.GroupByKey -> beam.CombinePerKey
> {code:bash}
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
>     response = task()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 633, in process_bundle
>     instruction_id, request.process_bundle_descriptor_id)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 462, in get
>     self.data_channel_factory)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 862, in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 919, in create_execution_tree
>     descriptor.transforms, key=topological_height, reverse=True)])
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 918, in <listcomp>
>     get_operation(transform_id))) for transform_id in sorted(
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
>     result = cache[args] = func(*args)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp>
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
>     result = cache[args] = func(*args)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp>
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
>     result = cache[args] = func(*args)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp>
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
>     result = cache[args] = func(*args)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp>
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
>     result = cache[args] = func(*args)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in get_operation
>     transform_id, transform_consumers)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1192, in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1776, in create_combine_per_key_convert_to_accumulators
>     factory, transform_id, transform_proto, payload, consumers, 'convert')
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1798, in _create_combine_phase_operation
>     factory.context), [], {}))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py", line 186, in from_runner_api
>     proto_utils.parse_Bytes(fn_proto.payload, parameter_type), context)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py", line 160, in <lambda>
>     unused_context: pickler.loads(proto.value))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in loads
>     return dill.loads(s)
>   File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
>     return load(file, ignore, **kwds)
>   File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
>     return Unpickler(file, ignore=ignore, **kwds).load()
>   File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
>     obj = StockUnpickler.load(self)
> TypeError: code() takes at most 15 arguments (16 given)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)