You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 18:07:38 UTC

[GitHub] [beam] damccorm opened a new issue, #20551: Side Inputs to beam.Partition

damccorm opened a new issue, #20551:
URL: https://github.com/apache/beam/issues/20551

   Side inputs work with a regular ParDo and function, but I can't seem to get it to work with beam.Partition. The code and exception below demonstrates the problem.
   
   ```
   import apache_beam as beam
   
   def main():
   
     class SideFn(beam.PartitionFn):
       def partition_for(self, element, *args, **kwargs):
         print(element, args, kwargs)
   
     def just_print(element, *args, **kwargs):
         print(element, args, kwargs)
   
     with beam.Pipeline() as p:
         side = p | 'CreateSide' \>\> beam.Create(['a'])
         p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,  side=beam.pvalue.AsSingleton(side))
         - p | beam.Create([1, 2, 3]) | beam.ParDo(just_print, 99,  side=beam.pvalue.AsSingleton(side))
   
   
   if __name__ \== '__main__':
       main()
   ```
   
   
   
   /Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/bin/python /Users/joetoth/projects/joetoth.com/psy/part.py
   Traceback (most recent call last):
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 134, in <genexpr\>
       (k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v) for k,
   StopIteration
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam/runners/common.py", line 804, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 133, in insert_values_in_args
       new_kwargs = dict(
   RuntimeError: generator raised StopIteration
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 19, in <module\>
       main()
     File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 14, in main
       p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,  side=beam.pvalue.AsSingleton(side))
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py", line 568, in __exit__
       self.result = self.run()
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py", line 547, in run
       return self.runner.run_pipeline(self, self._options)
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
       return runner.run_pipeline(pipeline, options)
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 175, in run_pipeline
       self._latest_run_result = self.run_via_runner_api(
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
       return self.run_stages(stage_context, stages)
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 344, in run_stages
       stage_results = self._run_stage(
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 527, in _run_stage
       last_result, deferred_inputs, fired_timers = self._run_bundle(
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 571, in _run_bundle
       result, splits = bundle_manager.process_bundle(
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
       result_future = self._worker_handler.control_conn.push(process_bundle_req)
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
       response = self.worker.do_instruction(request)
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
       return getattr(self, request_type)(
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
       bundle_processor.process_bundle(instruction_id))
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 984, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
       self.output(decoded_value)
     File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
     File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
     File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
     File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
       raise exc.with_traceback(traceback)
     File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam/runners/common.py", line 804, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 133, in insert_values_in_args
       new_kwargs = dict(
   RuntimeError: generator raised StopIteration [while running 'Partition(SideFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
   
   Process finished with exit code 1
   
   
   Imported from Jira [BEAM-11158](https://issues.apache.org/jira/browse/BEAM-11158). Original Jira may contain additional context.
   Reported by: weazelb0y.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn closed issue #20551: Side Inputs to beam.Partition

Posted by GitBox <gi...@apache.org>.
tvalentyn closed issue #20551: Side Inputs to beam.Partition
URL: https://github.com/apache/beam/issues/20551


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #20551: Side Inputs to beam.Partition

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on issue #20551:
URL: https://github.com/apache/beam/issues/20551#issuecomment-1164290599

   I couldn't repro this. Tried:
   
   ```
   import apache_beam as beam
   
   def main():
   
     class SideFn(beam.PartitionFn):
       def partition_for(self, element, *args, **kwargs):
         print(element, args, kwargs)
         return 0
   
     def just_print(element, *args, **kwargs):
       print(element, args, kwargs)
   
     with beam.Pipeline() as p:
       side = p | 'CreateSide' >> beam.Create(['a'])
       p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99, side=beam.pvalue.AsSingleton(side))
   
       p | 'Create2' >> beam.Create([1, 2, 3]) | beam.ParDo(just_print, 99, side=beam.pvalue.AsSingleton(side))
   
   if __name__ == '__main__':
     main()
   ```
   
   ```
   python pipe.py
   1 (99,) {'side': 'a'}
   2 (99,) {'side': 'a'}
   3 (99,) {'side': 'a'}
   1 (99,) {'side': 'a'}
   2 (99,) {'side': 'a'}
   3 (99,) {'side': 'a'}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org