You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 22:30:29 UTC

[GitHub] [beam] damccorm opened a new issue, #21258: Dataflow error in CombinePerKey operation

damccorm opened a new issue, #21258:
URL: https://github.com/apache/beam/issues/21258

   This occurs in Dataflow when trying to deploy a workflow from Pubsub -\> SlidingWindows -\> beam.ParDo(KeyValues()) -\> beam.GroupByKey -\> beam.CombinePerKey
   ```
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call
   last):
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line
   284, in _execute
       response = task()
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 357, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 602, in do_instruction
       getattr(request, request_type), request.instruction_id)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 633, in process_bundle
       instruction_id, request.process_bundle_descriptor_id)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 462, in get
       self.data_channel_factory)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 862, in __init__
       self.ops = self.create_execution_tree(self.process_bundle_descriptor)
    
   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 919,
   in create_execution_tree
       descriptor.transforms, key=topological_height, reverse=True)])
     File
   "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 918, in
   <listcomp>
       get_operation(transform_id))) for transform_id in sorted(
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 806, in wrapper
       result = cache[args] = func(*args)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 900, in get_operation
       pcoll_id in descriptor.transforms[transform_id].outputs.items()
     File
   "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in
   <dictcomp>
       pcoll_id in descriptor.transforms[transform_id].outputs.items()
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 898, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 806, in wrapper
       result = cache[args] = func(*args)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 900, in get_operation
       pcoll_id in descriptor.transforms[transform_id].outputs.items()
     File
   "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in
   <dictcomp>
       pcoll_id in descriptor.transforms[transform_id].outputs.items()
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 898, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 806, in wrapper
       result = cache[args] = func(*args)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 900, in get_operation
       pcoll_id in descriptor.transforms[transform_id].outputs.items()
     File
   "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in
   <dictcomp>
       pcoll_id in descriptor.transforms[transform_id].outputs.items()
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 898, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 806, in wrapper
       result = cache[args] = func(*args)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 900, in get_operation
       pcoll_id in descriptor.transforms[transform_id].outputs.items()
     File
   "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in
   <dictcomp>
       pcoll_id in descriptor.transforms[transform_id].outputs.items()
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 898, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 806, in wrapper
       result = cache[args] = func(*args)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 903, in get_operation
       transform_id, transform_consumers)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 1192, in create_operation
       return creator(self, transform_id, transform_proto, payload, consumers)
   
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line
   1776, in create_combine_per_key_convert_to_accumulators
       factory, transform_id, transform_proto,
   payload, consumers, 'convert')
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 1798, in _create_combine_phase_operation
       factory.context), [], {}))
     File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py",
   line 186, in from_runner_api
       proto_utils.parse_Bytes(fn_proto.payload, parameter_type), context)
   
    File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py", line 160, in <lambda>
     
    unused_context: pickler.loads(proto.value))
     File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
   line 287, in loads
       return dill.loads(s)
     File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
   line 275, in loads
       return load(file, ignore, **kwds)
     File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
   line 270, in load
       return Unpickler(file, ignore=ignore, **kwds).load()
     File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",
   line 472, in load
       obj = StockUnpickler.load(self)
   TypeError: code() takes at most 15 arguments
   (16 given)
   
   ```
   
   
   Imported from Jira [BEAM-12959](https://issues.apache.org/jira/browse/BEAM-12959). Original Jira may contain additional context.
   Reported by: eddiewang.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org