You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Udi Meiri (Jira)" <ji...@apache.org> on 2019/11/20 01:07:00 UTC

[jira] [Created] (BEAM-8782) Python typehints: with_output_types breaks multi-output dofns

Udi Meiri created BEAM-8782:
-------------------------------

             Summary: Python typehints: with_output_types breaks multi-output dofns
                 Key: BEAM-8782
                 URL: https://issues.apache.org/jira/browse/BEAM-8782
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
            Reporter: Udi Meiri
            Assignee: Udi Meiri


{code}
  def test_typed_multi_pardo(self):
    p = TestPipeline()
    res = (p
           | beam.Create([1, 2, 3])
           | beam.Map(lambda e: e).with_outputs().with_output_types(int))
    self.assertIsNotNone(res[None].element_type)
    res_main = (res[None]
                | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
    assert_that(res_main, equal_to([1, 2, 3]), label='none_check')
    p.run()
{code}

Fails with:
{code}
typed_pipeline_test.py:212: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../pvalue.py:113: in __or__
    return self.pipeline.apply(ptransform, self)
../pipeline.py:528: in apply
    transform.type_check_outputs(pvalueish_result)
../transforms/ptransform.py:386: in type_check_outputs
    self.type_check_inputs_or_outputs(pvalueish, 'output')
../transforms/ptransform.py:401: in type_check_inputs_or_outputs
    if pvalue_.element_type is None:
../pvalue.py:241: in __getattr__
    return self[tag]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <DoOutputsTuple main_tag=None tags=() transform=<ParDo(PTransform) label=[Map(<lambda at typed_pipeline_test.py:212>)]> at 0x7fa9513f3048>
tag = 'element_type'

    def __getitem__(self, tag):
      # Accept int tags so that we can look at Partition tags with the
      # same ints that we used in the partition function.
      # TODO(gildea): Consider requiring string-based tags everywhere.
      # This will require a partition function that does not return ints.
      if isinstance(tag, int):
        tag = str(tag)
      if tag == self._main_tag:
        tag = None
      elif self._tags and tag not in self._tags:
        raise ValueError(
            "Tag '%s' is neither the main tag '%s' "
            "nor any of the tags %s" % (
                tag, self._main_tag, self._tags))
      # Check if we accessed this tag before.
      if tag in self._pcolls:
        return self._pcolls[tag]
    
      if tag is not None:
        self._transform.output_tags.add(tag)
        pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)
        # Transfer the producer from the DoOutputsTuple to the resulting
        # PCollection.
>       pcoll.producer = self.producer.parts[0]
E       AttributeError: 'NoneType' object has no attribute 'parts'

../pvalue.py:266: AttributeError
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)