You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 22:00:43 UTC
[GitHub] [beam] kennknowles opened a new issue, #19027: Assert failing: node not reached
kennknowles opened a new issue, #19027:
URL: https://github.com/apache/beam/issues/19027
I have a pipeline here failing on 2.4.0 and not 2.3.0. The issue is that at the preparation of the pipeline an assert is failing, apparently because a side-input cannot be accessed.
This is the code:
```
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
if input is None:
return sum, count
return sum + input, count + 1
def merge_accumulators(self,
accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return int(sum /
count) if count else float('NaN')
```
```
with beam.Pipeline(options=options) as p:
query = "SELECT * FROM training_data.1529424357650_not_aggregated
WHERE zip_code='60613'"
datapoints = p | beam.io.Read(beam.io.BigQuerySource(query=query))
# Count the occurrences of each word.
county_datapoints = (datapoints
| 'FilterDatapoints'
>> beam.ParDo(FilterDatapoints())
| 'ZipAsKey' >> beam.Map(lambda x: (x['zip_code'], x)))
# creates a collection of tuples: (<county>:str, <avg_year_built>:int)
avg_year_built = (county_datapoints
| 'YearBuiltAsValue' >> beam.Map(lambda x: (x[0], x[1]['year_built']))
| "CombineYearBuilt"
>> beam.CombinePerKey(AverageFn()))
models = (county_datapoints
| 'ExtractFeatures' >> beam.ParDo(
ExtractFeatures(),
avg_year_built=pvalue.AsDict(avg_year_built))
| 'GroupByZip'
>> beam.GroupByKey()
| 'ComputeModels' >> beam.ParDo(ComputeModel()))
models | beam.ParDo(SaveToBucket(),
bucket=output_gcs_bucket)
```
This is the traceback:
```
Traceback (most recent call last):
File "run.py", line 237, in <module>
run()
File "run.py", line
232, in run
models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 389, in __exit__
self.run().wait_until_finish()
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 369, in run
self.to_runner_api(), self.runner, self._options).run(False)
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 382, in run
return self.runner.run_pipeline(self)
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 129, in run_pipeline
return runner.run_pipeline(pipeline)
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 337, in run_pipeline
pipeline.replace_all(_get_transform_overrides(pipeline.options))
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 354, in replace_all
self._replace(override)
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 284, in _replace
self.visit(TransformUpdater(self))
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 410, in visit
self._root_transform().visit(visitor, self, visited)
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 764, in visit
part.visit(visitor, pipeline, visited)
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 754, in visit
assert pval in visited
AssertionError
```
If I print the pval that's not in visited, making the test fail, this is what I get: PCollection[CombineYearBuilt/Combine/ParDo(CombineValuesDoFn).None]
If I remove the assert, I get the following error:
```
Traceback (most recent call last):
File "run.py", line 237, in <module>
run()
File "run.py", line
232, in run
models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 389, in __exit__
self.run().wait_until_finish()
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 414, in wait_until_finish
self._executor.await_completion()
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
line 360, in await_completion
self._executor.await_completion()
File "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
line 403, in await_completion
six.reraise(t, v, tb)
File "<string>", line 3, in reraise
Exception:
Monitor task detected a pipeline stall.
```
Imported from Jira [BEAM-4633](https://issues.apache.org/jira/browse/BEAM-4633). Original Jira may contain additional context.
Reported by: nlassaux.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org