You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/08/10 17:07:19 UTC

[jira] [Updated] (BEAM-8782) Python typehints: with_output_types breaks multi-output dofns

     [ https://issues.apache.org/jira/browse/BEAM-8782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-8782:
--------------------------------
    Labels: stale-P2  (was: )

> Python typehints: with_output_types breaks multi-output dofns
> -------------------------------------------------------------
>
>                 Key: BEAM-8782
>                 URL: https://issues.apache.org/jira/browse/BEAM-8782
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Priority: P2
>              Labels: stale-P2
>
> {code}
>   def test_typed_multi_pardo(self):
>     p = TestPipeline()
>     res = (p
>            | beam.Create([1, 2, 3])
>            | beam.Map(lambda e: e).with_outputs().with_output_types(int))
>     self.assertIsNotNone(res[None].element_type)
>     res_main = (res[None]
>                 | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
>     assert_that(res_main, equal_to([1, 2, 3]), label='none_check')
>     p.run()
> {code}
> Fails with:
> {code}
> typed_pipeline_test.py:212: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> ../pvalue.py:113: in __or__
>     return self.pipeline.apply(ptransform, self)
> ../pipeline.py:528: in apply
>     transform.type_check_outputs(pvalueish_result)
> ../transforms/ptransform.py:386: in type_check_outputs
>     self.type_check_inputs_or_outputs(pvalueish, 'output')
> ../transforms/ptransform.py:401: in type_check_inputs_or_outputs
>     if pvalue_.element_type is None:
> ../pvalue.py:241: in __getattr__
>     return self[tag]
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> self = <DoOutputsTuple main_tag=None tags=() transform=<ParDo(PTransform) label=[Map(<lambda at typed_pipeline_test.py:212>)]> at 0x7fa9513f3048>
> tag = 'element_type'
>     def __getitem__(self, tag):
>       # Accept int tags so that we can look at Partition tags with the
>       # same ints that we used in the partition function.
>       # TODO(gildea): Consider requiring string-based tags everywhere.
>       # This will require a partition function that does not return ints.
>       if isinstance(tag, int):
>         tag = str(tag)
>       if tag == self._main_tag:
>         tag = None
>       elif self._tags and tag not in self._tags:
>         raise ValueError(
>             "Tag '%s' is neither the main tag '%s' "
>             "nor any of the tags %s" % (
>                 tag, self._main_tag, self._tags))
>       # Check if we accessed this tag before.
>       if tag in self._pcolls:
>         return self._pcolls[tag]
>     
>       if tag is not None:
>         self._transform.output_tags.add(tag)
>         pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)
>         # Transfer the producer from the DoOutputsTuple to the resulting
>         # PCollection.
> >       pcoll.producer = self.producer.parts[0]
> E       AttributeError: 'NoneType' object has no attribute 'parts'
> ../pvalue.py:266: AttributeError
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)