You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Stephan Hoyer (Jira)" <ji...@apache.org> on 2021/04/27 17:20:03 UTC

[jira] [Created] (BEAM-12233) Python InteractiveRunner works when evaluated as a global but not in a function

Stephan Hoyer created BEAM-12233:
------------------------------------

             Summary: Python InteractiveRunner works when evaluated as a global but not in a function
                 Key: BEAM-12233
                 URL: https://issues.apache.org/jira/browse/BEAM-12233
             Project: Beam
          Issue Type: Bug
          Components: runner-py-interactive
         Environment: Python 3.7.10, Beam 2.28.0
            Reporter: Stephan Hoyer


The following snippet works when I run it in a Jupyter notebook (e.g., Google Colab):
{code:java}
import apache_beam as beam

transform = beam.Create([1, 2, 3])

p = beam.Pipeline('InteractiveRunner')
pcoll = p | transform
result = p.run()
result.wait_until_finish()
print(result.get(pcoll))  # [1, 2, 3]{code}
 

However, if I try to put creating/evaluating the pipeline into a helper function it fails with a mysterious error:

 
{code:java}
import apache_beam as beam

def evaluate(transform):
  p = beam.Pipeline('InteractiveRunner')
  pcoll = p | transform
  result = p.run()
  result.wait_until_finish()
  return result.get(pcoll)

transform = beam.Create([1, 2, 3])
evaluate(transform){code}
results in 
{code:java}
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-33-271cfe7be46f> in <module>()
      9 
     10 transform = beam.Create([1, 2, 3])
---> 11 evaluate(transform)

<ipython-input-33-271cfe7be46f> in evaluate(transform)
      4   p = beam.Pipeline('InteractiveRunner')
      5   pcoll = p | transform
----> 6   result = p.run()
      7   result.wait_until_finish()
      8   return result.get(pcoll)

/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    557         finally:
    558           shutil.rmtree(tmpdir)
--> 559       return self.runner.run_pipeline(self, self._options)
    560     finally:
    561       shutil.rmtree(self.local_tempdir, ignore_errors=True)

/usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
    134 
    135     # Make sure that sources without a user reference are still cached.
--> 136     inst.watch_sources(pipeline)
    137 
    138     user_pipeline = ie.current_env().user_pipeline(pipeline)

/usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/pipeline_instrument.py in watch_sources(pipeline)
   1006           ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
   1007 
-> 1008   retrieved_user_pipeline.visit(CacheableUnboundedPCollectionVisitor())

AttributeError: 'NoneType' object has no attribute 'visit'{code}
It would be nice if this worked, or failing that, if at least an instructive error message was printed :)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)