You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/07/11 17:21:01 UTC
[jira] [Updated] (BEAM-12233) Python InteractiveRunner works when
evaluated as a global but not in a function
[ https://issues.apache.org/jira/browse/BEAM-12233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-12233:
---------------------------------
Priority: P3 (was: P2)
> Python InteractiveRunner works when evaluated as a global but not in a function
> -------------------------------------------------------------------------------
>
> Key: BEAM-12233
> URL: https://issues.apache.org/jira/browse/BEAM-12233
> Project: Beam
> Issue Type: Bug
> Components: runner-py-interactive
> Environment: Python 3.7.10, Beam 2.28.0
> Reporter: Stephan Hoyer
> Priority: P3
> Labels: stale-P2
>
> The following snippet works when I run it in a Jupyter notebook (e.g., Google Colab):
> {code:java}
> import apache_beam as beam
> transform = beam.Create([1, 2, 3])
> p = beam.Pipeline('InteractiveRunner')
> pcoll = p | transform
> result = p.run()
> result.wait_until_finish()
> print(result.get(pcoll)) # [1, 2, 3]{code}
>
> However, if I try to put creating/evaluating the pipeline into a helper function it fails with a mysterious error:
>
> {code:java}
> import apache_beam as beam
> def evaluate(transform):
> p = beam.Pipeline('InteractiveRunner')
> pcoll = p | transform
> result = p.run()
> result.wait_until_finish()
> return result.get(pcoll)
> transform = beam.Create([1, 2, 3])
> evaluate(transform){code}
> results in
> {code:java}
> ---------------------------------------------------------------------------
> AttributeError Traceback (most recent call last)
> <ipython-input-33-271cfe7be46f> in <module>()
> 9
> 10 transform = beam.Create([1, 2, 3])
> ---> 11 evaluate(transform)
> <ipython-input-33-271cfe7be46f> in evaluate(transform)
> 4 p = beam.Pipeline('InteractiveRunner')
> 5 pcoll = p | transform
> ----> 6 result = p.run()
> 7 result.wait_until_finish()
> 8 return result.get(pcoll)
> /usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py in run(self, test_runner_api)
> 557 finally:
> 558 shutil.rmtree(tmpdir)
> --> 559 return self.runner.run_pipeline(self, self._options)
> 560 finally:
> 561 shutil.rmtree(self.local_tempdir, ignore_errors=True)
> /usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
> 134
> 135 # Make sure that sources without a user reference are still cached.
> --> 136 inst.watch_sources(pipeline)
> 137
> 138 user_pipeline = ie.current_env().user_pipeline(pipeline)
> /usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/pipeline_instrument.py in watch_sources(pipeline)
> 1006 ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
> 1007
> -> 1008 retrieved_user_pipeline.visit(CacheableUnboundedPCollectionVisitor())
> AttributeError: 'NoneType' object has no attribute 'visit'{code}
> It would be nice if this worked, or failing that, if at least an instructive error message was printed :)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)