You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:55:15 UTC

[GitHub] [beam] damccorm opened a new issue, #21228: AsIter side input is not correctly recognized as a dependency.

damccorm opened a new issue, #21228:
URL: https://github.com/apache/beam/issues/21228

   The error is happening at current master (head). It is fine on the latest release (2.33.0).
   
   Example to reproduce: 
   ```
   
   import unittest
   
   import apache_beam as beam
   
   
   class LoggingFn(beam.DoFn):
   
     def __init__(self,
   name):
       self._name = name
   
     def process(self, element, *side_inputs):
       print(f'Running {self._name}
   (side inputs: {[list(s) for s in side_inputs]})')
       return [self._name]
   
   
   class BeamDagTest(unittest.TestCase):
   
   
    def test_dag(self):
       with beam.Pipeline() as p:
         root = p | 'CreateRoot' >> beam.Create([None])
   
        example_gen = root | 'CsvExampleGen' >> beam.ParDo(
             LoggingFn('CsvExampleGen'),
     
      )
         statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
             LoggingFn('StatisticsGen'),
   
            beam.pvalue.AsIter(example_gen),  # AsIter to specify upstream task dependency.
         )
   
        schema_gen = root | 'SchemaGen' >> beam.ParDo(
             LoggingFn('SchemaGen'),
             beam.pvalue.AsIter(statistics_gen),
   
        )
         example_validator = root | 'ExampleValidator' >> beam.ParDo(
             LoggingFn('ExampleValidator'),
   
            beam.pvalue.AsIter(statistics_gen),
             beam.pvalue.AsIter(schema_gen),
         )
    
       transform = root | 'Transform' >> beam.ParDo(
             LoggingFn('Transform'),
             beam.pvalue.AsIter(example_gen),
   
            beam.pvalue.AsIter(schema_gen),
         )
         trainer = root | 'Trainer' >> beam.ParDo(
   
            LoggingFn('Trainer'),
             beam.pvalue.AsIter(example_gen),
             beam.pvalue.AsIter(schema_gen),
   
            beam.pvalue.AsIter(transform),
         )
         model_resolver = root | 'latest_blessed_model_resolver'
   >> beam.ParDo(
             LoggingFn('latest_blessed_model_resolver'),
         )
         evaluator = root
   | 'Evaluator' >> beam.ParDo(
             LoggingFn('Evaluator'),
             beam.pvalue.AsIter(example_gen),
   
            beam.pvalue.AsIter(trainer),
             beam.pvalue.AsIter(model_resolver),
         )
       
    pusher = root | 'Pusher' >> beam.ParDo(
             LoggingFn('Pusher'),
             beam.pvalue.AsIter(trainer),
   
            beam.pvalue.AsIter(evaluator),
         )
   ```
   
    
    According to AsIter [documentation](https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527), entire PCollection should be made available as a side input, which means side input PTransform should run before the current PTransform. We used to exploit this feature to run DAG of tasks by injecting task dependency with side inputs, however this mechanism does not work properly in current master (71d7213d98):
   
   Output with apache-beam\==2.33.0:
   ```
   
   Running CsvExampleGen (side inputs: [])
   Running latest_blessed_model_resolver (side inputs: [])
   Running
   StatisticsGen (side inputs: [['CsvExampleGen']])
   Running SchemaGen (side inputs: [['StatisticsGen']])
   Running
   ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
   Running Transform (side inputs:
   [['CsvExampleGen'], ['SchemaGen']])
   Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'],
   ['Transform']])
   Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], ['latest_blessed_model_resolver']])
   Running
   Pusher (side inputs: [['Trainer'], ['Evaluator']])
   ```
   
   Output with apache-beam installed from 71d7213d98 (origin/master):
   ```
   
   Running CsvExampleGen (side inputs: [])
   Running latest_blessed_model_resolver (side inputs: [])
   Running
   StatisticsGen (side inputs: [['CsvExampleGen']])
   Running Pusher (side inputs: [[], []])
   Running Evaluator
   (side inputs: [['CsvExampleGen'], [], ['latest_blessed_model_resolver']])
   Running SchemaGen (side inputs:
   [['StatisticsGen']])
   Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
   Running
   ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
   Running Transform (side inputs:
   [['CsvExampleGen'], ['SchemaGen']])
   ```
   
   
   Imported from Jira [BEAM-13040](https://issues.apache.org/jira/browse/BEAM-13040). Original Jira may contain additional context.
   Reported by: Park.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org