You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jongbin Park (Jira)" <ji...@apache.org> on 2021/10/13 07:33:00 UTC
[jira] [Updated] (BEAM-13040) AsIter side input is not correctly
recognized as a dependency.
[ https://issues.apache.org/jira/browse/BEAM-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jongbin Park updated BEAM-13040:
--------------------------------
Summary: AsIter side input is not correctly recognized as a dependency. (was: AsIter side input is not currectly recognized as a dependency.)
> AsIter side input is not correctly recognized as a dependency.
> --------------------------------------------------------------
>
> Key: BEAM-13040
> URL: https://issues.apache.org/jira/browse/BEAM-13040
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Environment: Linux Debian 5.10 x86_64
> Python 3.8.11
> Reporter: Jongbin Park
> Priority: P2
>
> The error is happening at current master (head). It is fine on the latest release (2.33.0).
> Example to reproduce:
> {code:python}
> import unittest
> import apache_beam as beam
> class LoggingFn(beam.DoFn):
> def __init__(self, name):
> self._name = name
> def process(self, element, *side_inputs):
> print(f'Running {self._name} (side inputs: {[list(s) for s in side_inputs]})')
> return [self._name]
> class BeamDagTest(unittest.TestCase):
> def test_dag(self):
> with beam.Pipeline() as p:
> root = p | 'CreateRoot' >> beam.Create([None])
> example_gen = root | 'CsvExampleGen' >> beam.ParDo(
> LoggingFn('CsvExampleGen'),
> )
> statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
> LoggingFn('StatisticsGen'),
> beam.pvalue.AsIter(example_gen), # AsIter to specify upstream task dependency.
> )
> schema_gen = root | 'SchemaGen' >> beam.ParDo(
> LoggingFn('SchemaGen'),
> beam.pvalue.AsIter(statistics_gen),
> )
> example_validator = root | 'ExampleValidator' >> beam.ParDo(
> LoggingFn('ExampleValidator'),
> beam.pvalue.AsIter(statistics_gen),
> beam.pvalue.AsIter(schema_gen),
> )
> transform = root | 'Transform' >> beam.ParDo(
> LoggingFn('Transform'),
> beam.pvalue.AsIter(example_gen),
> beam.pvalue.AsIter(schema_gen),
> )
> trainer = root | 'Trainer' >> beam.ParDo(
> LoggingFn('Trainer'),
> beam.pvalue.AsIter(example_gen),
> beam.pvalue.AsIter(schema_gen),
> beam.pvalue.AsIter(transform),
> )
> model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo(
> LoggingFn('latest_blessed_model_resolver'),
> )
> evaluator = root | 'Evaluator' >> beam.ParDo(
> LoggingFn('Evaluator'),
> beam.pvalue.AsIter(example_gen),
> beam.pvalue.AsIter(trainer),
> beam.pvalue.AsIter(model_resolver),
> )
> pusher = root | 'Pusher' >> beam.ParDo(
> LoggingFn('Pusher'),
> beam.pvalue.AsIter(trainer),
> beam.pvalue.AsIter(evaluator),
> ){code}
>
> According to AsIter [documentation|https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527], entire PCollection should be made available as a side input before running the ParDo, which means it can be used to specify a task dependency, however this somehow does not work in current master (71d7213d98):
>
> Output with apache-beam==2.33.0:
> {code:java}
> Running CsvExampleGen (side inputs: [])
> Running latest_blessed_model_resolver (side inputs: [])
> Running StatisticsGen (side inputs: [['CsvExampleGen']])
> Running SchemaGen (side inputs: [['StatisticsGen']])
> Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
> Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])
> Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], ['Transform']])
> Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], ['latest_blessed_model_resolver']])
> Running Pusher (side inputs: [['Trainer'], ['Evaluator']]){code}
> Output with apache-beam installed from 71d7213d98 (origin/master):
> {code:java}
> Running CsvExampleGen (side inputs: [])
> Running latest_blessed_model_resolver (side inputs: [])
> Running StatisticsGen (side inputs: [['CsvExampleGen']])
> Running Pusher (side inputs: [[], []])
> Running Evaluator (side inputs: [['CsvExampleGen'], [], ['latest_blessed_model_resolver']])
> Running SchemaGen (side inputs: [['StatisticsGen']])
> Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
> Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
> Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)