You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/01/12 17:13:03 UTC

[jira] [Updated] (BEAM-11158) Side Inputs to beam.Partition

     [ https://issues.apache.org/jira/browse/BEAM-11158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-11158:
---------------------------------
    Priority: P3  (was: P2)

> Side Inputs to beam.Partition
> -----------------------------
>
>                 Key: BEAM-11158
>                 URL: https://issues.apache.org/jira/browse/BEAM-11158
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Joseph Toth
>            Priority: P3
>              Labels: stale-P2
>
> Side inputs work with a regular ParDo and function, but I can't seem to get it to work with beam.Partition. The code and exception below demonstrates the problem.
> ```
> import apache_beam as beam
> def main():
>   class SideFn(beam.PartitionFn):
>     def partition_for(self, element, *args, **kwargs):
>       print(element, args, kwargs)
>   def just_print(element, *args, **kwargs):
>       print(element, args, kwargs)
>   with beam.Pipeline() as p:
>       side = p | 'CreateSide' >> beam.Create(['a'])
>       p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,  side=beam.pvalue.AsSingleton(side))
>       # p | beam.Create([1, 2, 3]) | beam.ParDo(just_print, 99,  side=beam.pvalue.AsSingleton(side))
> if __name__ == '__main__':
>     main()
> ```
> /Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/bin/python /Users/joetoth/projects/joetoth.com/psy/part.py
> Traceback (most recent call last):
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 134, in <genexpr>
>     (k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v) for k,
> StopIteration
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 804, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 133, in insert_values_in_args
>     new_kwargs = dict(
> RuntimeError: generator raised StopIteration
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 19, in <module>
>     main()
>   File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 14, in main
>     p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,  side=beam.pvalue.AsSingleton(side))
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py", line 568, in __exit__
>     self.result = self.run()
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py", line 547, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 175, in run_pipeline
>     self._latest_run_result = self.run_via_runner_api(
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 344, in run_stages
>     stage_results = self._run_stage(
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 527, in _run_stage
>     last_result, deferred_inputs, fired_timers = self._run_bundle(
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 571, in _run_bundle
>     result, splits = bundle_manager.process_bundle(
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
>     response = self.worker.do_instruction(request)
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
>     return getattr(self, request_type)(
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 984, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 804, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 133, in insert_values_in_args
>     new_kwargs = dict(
> RuntimeError: generator raised StopIteration [while running 'Partition(SideFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
> Process finished with exit code 1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)