You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jongbin Park (Jira)" <ji...@apache.org> on 2021/10/13 07:33:00 UTC

[jira] [Updated] (BEAM-13040) AsIter side input is not correctly recognized as a dependency.

     [ https://issues.apache.org/jira/browse/BEAM-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jongbin Park updated BEAM-13040:
--------------------------------
    Summary: AsIter side input is not correctly recognized as a dependency.  (was: AsIter side input is not currectly recognized as a dependency.)

> AsIter side input is not correctly recognized as a dependency.
> --------------------------------------------------------------
>
>                 Key: BEAM-13040
>                 URL: https://issues.apache.org/jira/browse/BEAM-13040
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>         Environment: Linux Debian 5.10 x86_64
> Python 3.8.11
>            Reporter: Jongbin Park
>            Priority: P2
>
> The error is happening at current master (head). It is fine on the latest release (2.33.0).
> Example to reproduce: 
> {code:python}
> import unittest
> import apache_beam as beam
> class LoggingFn(beam.DoFn):
>   def __init__(self, name):
>     self._name = name
>   def process(self, element, *side_inputs):
>     print(f'Running {self._name} (side inputs: {[list(s) for s in side_inputs]})')
>     return [self._name]
> class BeamDagTest(unittest.TestCase):
>   def test_dag(self):
>     with beam.Pipeline() as p:
>       root = p | 'CreateRoot' >> beam.Create([None])
>       example_gen = root | 'CsvExampleGen' >> beam.ParDo(
>           LoggingFn('CsvExampleGen'),
>       )
>       statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
>           LoggingFn('StatisticsGen'),
>           beam.pvalue.AsIter(example_gen),  # AsIter to specify upstream task dependency.
>       )
>       schema_gen = root | 'SchemaGen' >> beam.ParDo(
>           LoggingFn('SchemaGen'),
>           beam.pvalue.AsIter(statistics_gen),
>       )
>       example_validator = root | 'ExampleValidator' >> beam.ParDo(
>           LoggingFn('ExampleValidator'),
>           beam.pvalue.AsIter(statistics_gen),
>           beam.pvalue.AsIter(schema_gen),
>       )
>       transform = root | 'Transform' >> beam.ParDo(
>           LoggingFn('Transform'),
>           beam.pvalue.AsIter(example_gen),
>           beam.pvalue.AsIter(schema_gen),
>       )
>       trainer = root | 'Trainer' >> beam.ParDo(
>           LoggingFn('Trainer'),
>           beam.pvalue.AsIter(example_gen),
>           beam.pvalue.AsIter(schema_gen),
>           beam.pvalue.AsIter(transform),
>       )
>       model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo(
>           LoggingFn('latest_blessed_model_resolver'),
>       )
>       evaluator = root | 'Evaluator' >> beam.ParDo(
>           LoggingFn('Evaluator'),
>           beam.pvalue.AsIter(example_gen),
>           beam.pvalue.AsIter(trainer),
>           beam.pvalue.AsIter(model_resolver),
>       )
>       pusher = root | 'Pusher' >> beam.ParDo(
>           LoggingFn('Pusher'),
>           beam.pvalue.AsIter(trainer),
>           beam.pvalue.AsIter(evaluator),
>       ){code}
>  
>  According to AsIter [documentation|https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527], entire PCollection should be made available as a side input before running the ParDo, which means it can be used to specify a task dependency, however this somehow does not work in current master (71d7213d98):
>  
> Output with apache-beam==2.33.0:
> {code:java}
> Running CsvExampleGen (side inputs: [])
> Running latest_blessed_model_resolver (side inputs: [])
> Running StatisticsGen (side inputs: [['CsvExampleGen']])
> Running SchemaGen (side inputs: [['StatisticsGen']])
> Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
> Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])
> Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], ['Transform']])
> Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], ['latest_blessed_model_resolver']])
> Running Pusher (side inputs: [['Trainer'], ['Evaluator']]){code}
> Output with apache-beam installed from 71d7213d98 (origin/master):
> {code:java}
> Running CsvExampleGen (side inputs: [])
> Running latest_blessed_model_resolver (side inputs: [])
> Running StatisticsGen (side inputs: [['CsvExampleGen']])
> Running Pusher (side inputs: [[], []])
> Running Evaluator (side inputs: [['CsvExampleGen'], [], ['latest_blessed_model_resolver']])
> Running SchemaGen (side inputs: [['StatisticsGen']])
> Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
> Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
> Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)