You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Udi Meiri (Jira)" <ji...@apache.org> on 2019/11/20 01:07:00 UTC
[jira] [Created] (BEAM-8782) Python typehints: with_output_types
breaks multi-output dofns
Udi Meiri created BEAM-8782:
-------------------------------
Summary: Python typehints: with_output_types breaks multi-output dofns
Key: BEAM-8782
URL: https://issues.apache.org/jira/browse/BEAM-8782
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Reporter: Udi Meiri
Assignee: Udi Meiri
{code}
def test_typed_multi_pardo(self):
p = TestPipeline()
res = (p
| beam.Create([1, 2, 3])
| beam.Map(lambda e: e).with_outputs().with_output_types(int))
self.assertIsNotNone(res[None].element_type)
res_main = (res[None]
| 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
assert_that(res_main, equal_to([1, 2, 3]), label='none_check')
p.run()
{code}
Fails with:
{code}
typed_pipeline_test.py:212:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../pvalue.py:113: in __or__
return self.pipeline.apply(ptransform, self)
../pipeline.py:528: in apply
transform.type_check_outputs(pvalueish_result)
../transforms/ptransform.py:386: in type_check_outputs
self.type_check_inputs_or_outputs(pvalueish, 'output')
../transforms/ptransform.py:401: in type_check_inputs_or_outputs
if pvalue_.element_type is None:
../pvalue.py:241: in __getattr__
return self[tag]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <DoOutputsTuple main_tag=None tags=() transform=<ParDo(PTransform) label=[Map(<lambda at typed_pipeline_test.py:212>)]> at 0x7fa9513f3048>
tag = 'element_type'
def __getitem__(self, tag):
# Accept int tags so that we can look at Partition tags with the
# same ints that we used in the partition function.
# TODO(gildea): Consider requiring string-based tags everywhere.
# This will require a partition function that does not return ints.
if isinstance(tag, int):
tag = str(tag)
if tag == self._main_tag:
tag = None
elif self._tags and tag not in self._tags:
raise ValueError(
"Tag '%s' is neither the main tag '%s' "
"nor any of the tags %s" % (
tag, self._main_tag, self._tags))
# Check if we accessed this tag before.
if tag in self._pcolls:
return self._pcolls[tag]
if tag is not None:
self._transform.output_tags.add(tag)
pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)
# Transfer the producer from the DoOutputsTuple to the resulting
# PCollection.
> pcoll.producer = self.producer.parts[0]
E AttributeError: 'NoneType' object has no attribute 'parts'
../pvalue.py:266: AttributeError
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)