You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 22:30:29 UTC
[GitHub] [beam] damccorm opened a new issue, #21258: Dataflow error in CombinePerKey operation
damccorm opened a new issue, #21258:
URL: https://github.com/apache/beam/issues/21258
This occurs in Dataflow when trying to deploy a workflow from Pubsub -\> SlidingWindows -\> beam.ParDo(KeyValues()) -\> beam.GroupByKey -\> beam.CombinePerKey
```
During handling of the above exception, another exception occurred:
Traceback (most recent call
last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line
284, in _execute
response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 357, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 602, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 633, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 462, in get
self.data_channel_factory)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 862, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 919,
in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 918, in
<listcomp>
get_operation(transform_id))) for transform_id in sorted(
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 806, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 900, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in
<dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 898, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 806, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 900, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in
<dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 898, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 806, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 900, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in
<dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 898, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 806, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 900, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in
<dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 898, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 806, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 903, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1192, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line
1776, in create_combine_per_key_convert_to_accumulators
factory, transform_id, transform_proto,
payload, consumers, 'convert')
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1798, in _create_combine_phase_operation
factory.context), [], {}))
File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py",
line 186, in from_runner_api
proto_utils.parse_Bytes(fn_proto.payload, parameter_type), context)
File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py", line 160, in <lambda>
unused_context: pickler.loads(proto.value))
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 287, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
line 472, in load
obj = StockUnpickler.load(self)
TypeError: code() takes at most 15 arguments
(16 given)
```
Imported from Jira [BEAM-12959](https://issues.apache.org/jira/browse/BEAM-12959). Original Jira may contain additional context.
Reported by: eddiewang.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org