You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Udi Meiri (Jira)" <ji...@apache.org> on 2019/11/18 21:37:00 UTC

[jira] [Commented] (BEAM-4132) Element type inference doesn't work for multi-output DoFns

    [ https://issues.apache.org/jira/browse/BEAM-4132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976906#comment-16976906 ] 

Udi Meiri commented on BEAM-4132:
---------------------------------

PCollections have element_type, inherited from PValue. The default value is None, which is not a valid value for a type.
Currently, the solution for DoOutputsTuple is to always set the type to Any.

The types should be derived from one of:
1. Tagged type hints. This doesn't exist yet.
2. Inferred result/output type. Pipeline._infer_result_type doesn't seem to work on undeclared tags (when DoOutputsTuple._tags is empty).


> Element type inference doesn't work for multi-output DoFns
> ----------------------------------------------------------
>
>                 Key: BEAM-4132
>                 URL: https://issues.apache.org/jira/browse/BEAM-4132
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.4.0
>            Reporter: Chuan Yu Foo
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> TLDR: if you have a multi-output DoFn, then the non-main PCollections with incorrectly have their element types set to None. This affects type checking for pipelines involving these PCollections.
> Minimal example:
> {code}
> import apache_beam as beam
> class TripleDoFn(beam.DoFn):
>   def process(self, elem):
>     yield_elem
>     if elem % 2 == 0:
>       yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
>     if elem % 3 == 0:
>       yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
>       
> @beam.typehints.with_input_types(int)
> @beam.typehints.with_output_types(int)
> class MultiplyBy(beam.DoFn):
>   def __init__(self, multiplier):
>     self._multiplier = multiplier
>   def process(self, elem):
>     return elem * self._multiplier
>   
> def main():
>   with beam.Pipeline() as p:
>     x, a, b = (
>       p
>       | 'Create' >> beam.Create([1, 2, 3])
>       | 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
>         'ten_times', 'hundred_times', main='main_output'))
>     _ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))
> if __name__ == '__main__':
>   main()    
> {code}
> Running this yields the following error:
> {noformat}
> apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 'MultiplyBy2': requires <type 'int'> but got None for elem
> {noformat}
> Replacing {{a}} with {{b}} yields the same error. Replacing {{a}} with {{x}} instead yields the following error:
> {noformat}
> apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 'MultiplyBy2': requires <type 'int'> but got Union[TaggedOutput, int] for elem
> {noformat}
> I would expect Beam to correctly infer that {{a}} and {{b}} have element types of {{int}} rather than {{None}}, and I would also expect Beam to correctly figure out that the element types of {{x}} are compatible with {{int}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)