You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 22:00:43 UTC

[GitHub] [beam] kennknowles opened a new issue, #19027: Assert failing: node not reached

kennknowles opened a new issue, #19027:
URL: https://github.com/apache/beam/issues/19027

   I have a pipeline here failing on 2.4.0 and not 2.3.0. The issue is that at the preparation of the pipeline an assert is failing, apparently because a side-input cannot be accessed.
   
    
   
   This is the code:
   ```
   
   class AverageFn(beam.CombineFn):
       def create_accumulator(self):
           return (0.0, 0)
   
     
    def add_input(self, sum_count, input):
           (sum, count) = sum_count
           if input is None:
   
              return sum, count
           return sum + input, count + 1
   
       def merge_accumulators(self,
   accumulators):
           sums, counts = zip(*accumulators)
           return sum(sums), sum(counts)
   
   
      def extract_output(self, sum_count):
           (sum, count) = sum_count
           return int(sum /
   count) if count else float('NaN')
   
   ```
   
   ```
   
   with beam.Pipeline(options=options) as p:
       query = "SELECT * FROM training_data.1529424357650_not_aggregated
   WHERE zip_code='60613'"
       datapoints = p | beam.io.Read(beam.io.BigQuerySource(query=query))
   
   
      # Count the occurrences of each word.
       county_datapoints = (datapoints
       | 'FilterDatapoints'
   >> beam.ParDo(FilterDatapoints())
       | 'ZipAsKey' >> beam.Map(lambda x: (x['zip_code'], x)))
   
    
     # creates a collection of tuples: (<county>:str, <avg_year_built>:int)
       avg_year_built = (county_datapoints
   
      | 'YearBuiltAsValue' >> beam.Map(lambda x: (x[0], x[1]['year_built']))
       | "CombineYearBuilt"
   >> beam.CombinePerKey(AverageFn()))
   
       models = (county_datapoints
       | 'ExtractFeatures' >> beam.ParDo(
   
          ExtractFeatures(),
           avg_year_built=pvalue.AsDict(avg_year_built))
       | 'GroupByZip'
   >> beam.GroupByKey()
       | 'ComputeModels' >> beam.ParDo(ComputeModel()))
   
       models | beam.ParDo(SaveToBucket(),
   bucket=output_gcs_bucket)
   
   
   ```
   
   This is the traceback:
   ```
   
   Traceback (most recent call last):
   File "run.py", line 237, in <module>
   run()
   File "run.py", line
   232, in run
   models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 389, in __exit__
   self.run().wait_until_finish()
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 369, in run
   self.to_runner_api(), self.runner, self._options).run(False)
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 382, in run
   return self.runner.run_pipeline(self)
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
   line 129, in run_pipeline
   return runner.run_pipeline(pipeline)
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
   line 337, in run_pipeline
   pipeline.replace_all(_get_transform_overrides(pipeline.options))
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 354, in replace_all
   self._replace(override)
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 284, in _replace
   self.visit(TransformUpdater(self))
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 410, in visit
   self._root_transform().visit(visitor, self, visited)
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 764, in visit
   part.visit(visitor, pipeline, visited)
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 754, in visit
   assert pval in visited
   AssertionError
   
   ```
   
   If I print the pval that's not in visited, making the test fail, this is what I get: PCollection[CombineYearBuilt/Combine/ParDo(CombineValuesDoFn).None]
   
   If I remove the assert, I get the following error:
   ```
   
   Traceback (most recent call last):
   File "run.py", line 237, in <module>
   run()
   File "run.py", line
   232, in run
   models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
   line 389, in __exit__
   self.run().wait_until_finish()
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
   line 414, in wait_until_finish
   self._executor.await_completion()
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
   line 360, in await_completion
   self._executor.await_completion()
   File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
   line 403, in await_completion
   six.reraise(t, v, tb)
   File "<string>", line 3, in reraise
   Exception:
   Monitor task detected a pipeline stall.
   
   ```
   
    
   
    
   
   Imported from Jira [BEAM-4633](https://issues.apache.org/jira/browse/BEAM-4633). Original Jira may contain additional context.
   Reported by: nlassaux.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org