You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/07/11 17:21:01 UTC

[jira] [Updated] (BEAM-12233) Python InteractiveRunner works when evaluated as a global but not in a function

     [ https://issues.apache.org/jira/browse/BEAM-12233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-12233:
---------------------------------
    Priority: P3  (was: P2)

> Python InteractiveRunner works when evaluated as a global but not in a function
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-12233
>                 URL: https://issues.apache.org/jira/browse/BEAM-12233
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-py-interactive
>         Environment: Python 3.7.10, Beam 2.28.0
>            Reporter: Stephan Hoyer
>            Priority: P3
>              Labels: stale-P2
>
> The following snippet works when I run it in a Jupyter notebook (e.g., Google Colab):
> {code:java}
> import apache_beam as beam
> transform = beam.Create([1, 2, 3])
> p = beam.Pipeline('InteractiveRunner')
> pcoll = p | transform
> result = p.run()
> result.wait_until_finish()
> print(result.get(pcoll))  # [1, 2, 3]{code}
>  
> However, if I try to put creating/evaluating the pipeline into a helper function it fails with a mysterious error:
>  
> {code:java}
> import apache_beam as beam
> def evaluate(transform):
>   p = beam.Pipeline('InteractiveRunner')
>   pcoll = p | transform
>   result = p.run()
>   result.wait_until_finish()
>   return result.get(pcoll)
> transform = beam.Create([1, 2, 3])
> evaluate(transform){code}
> results in 
> {code:java}
> ---------------------------------------------------------------------------
> AttributeError                            Traceback (most recent call last)
> <ipython-input-33-271cfe7be46f> in <module>()
>       9 
>      10 transform = beam.Create([1, 2, 3])
> ---> 11 evaluate(transform)
> <ipython-input-33-271cfe7be46f> in evaluate(transform)
>       4   p = beam.Pipeline('InteractiveRunner')
>       5   pcoll = p | transform
> ----> 6   result = p.run()
>       7   result.wait_until_finish()
>       8   return result.get(pcoll)
> /usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py in run(self, test_runner_api)
>     557         finally:
>     558           shutil.rmtree(tmpdir)
> --> 559       return self.runner.run_pipeline(self, self._options)
>     560     finally:
>     561       shutil.rmtree(self.local_tempdir, ignore_errors=True)
> /usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
>     134 
>     135     # Make sure that sources without a user reference are still cached.
> --> 136     inst.watch_sources(pipeline)
>     137 
>     138     user_pipeline = ie.current_env().user_pipeline(pipeline)
> /usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/pipeline_instrument.py in watch_sources(pipeline)
>    1006           ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
>    1007 
> -> 1008   retrieved_user_pipeline.visit(CacheableUnboundedPCollectionVisitor())
> AttributeError: 'NoneType' object has no attribute 'visit'{code}
> It would be nice if this worked, or failing that, if at least an instructive error message was printed :)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)