You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Stephan Hoyer (Jira)" <ji...@apache.org> on 2021/04/27 17:20:03 UTC
[jira] [Created] (BEAM-12233) Python InteractiveRunner works when
evaluated as a global but not in a function
Stephan Hoyer created BEAM-12233:
------------------------------------
Summary: Python InteractiveRunner works when evaluated as a global but not in a function
Key: BEAM-12233
URL: https://issues.apache.org/jira/browse/BEAM-12233
Project: Beam
Issue Type: Bug
Components: runner-py-interactive
Environment: Python 3.7.10, Beam 2.28.0
Reporter: Stephan Hoyer
The following snippet works when I run it in a Jupyter notebook (e.g., Google Colab):
{code:java}
import apache_beam as beam
transform = beam.Create([1, 2, 3])
p = beam.Pipeline('InteractiveRunner')
pcoll = p | transform
result = p.run()
result.wait_until_finish()
print(result.get(pcoll)) # [1, 2, 3]{code}
However, if I try to put creating/evaluating the pipeline into a helper function it fails with a mysterious error:
{code:java}
import apache_beam as beam
def evaluate(transform):
p = beam.Pipeline('InteractiveRunner')
pcoll = p | transform
result = p.run()
result.wait_until_finish()
return result.get(pcoll)
transform = beam.Create([1, 2, 3])
evaluate(transform){code}
results in
{code:java}
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-33-271cfe7be46f> in <module>()
9
10 transform = beam.Create([1, 2, 3])
---> 11 evaluate(transform)
<ipython-input-33-271cfe7be46f> in evaluate(transform)
4 p = beam.Pipeline('InteractiveRunner')
5 pcoll = p | transform
----> 6 result = p.run()
7 result.wait_until_finish()
8 return result.get(pcoll)
/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py in run(self, test_runner_api)
557 finally:
558 shutil.rmtree(tmpdir)
--> 559 return self.runner.run_pipeline(self, self._options)
560 finally:
561 shutil.rmtree(self.local_tempdir, ignore_errors=True)
/usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
134
135 # Make sure that sources without a user reference are still cached.
--> 136 inst.watch_sources(pipeline)
137
138 user_pipeline = ie.current_env().user_pipeline(pipeline)
/usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/pipeline_instrument.py in watch_sources(pipeline)
1006 ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
1007
-> 1008 retrieved_user_pipeline.visit(CacheableUnboundedPCollectionVisitor())
AttributeError: 'NoneType' object has no attribute 'visit'{code}
It would be nice if this worked, or failing that, if at least an instructive error message was printed :)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)