You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kyle Weaver (Jira)" <ji...@apache.org> on 2022/04/05 20:14:00 UTC

[jira] [Updated] (BEAM-14228) Nullable Integer support in with pandas not working as expected

     [ https://issues.apache.org/jira/browse/BEAM-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kyle Weaver updated BEAM-14228:
-------------------------------
    Status: Open  (was: Triage Needed)

> Nullable Integer support in with pandas not working as expected
> ---------------------------------------------------------------
>
>                 Key: BEAM-14228
>                 URL: https://issues.apache.org/jira/browse/BEAM-14228
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-dataframe
>    Affects Versions: 2.37.0
>            Reporter: Markus Kohler
>            Assignee: Brian Hulette
>            Priority: P2
>
> I reading data from a parquet and one of the columns is a Nullable Integer ([https://pandas.pydata.org/docs/user_guide/integer_na.html#integer-na)]
> Not 100% sure I correctly declared it:
>  
> {code:java}
> import typing
> from typing import Dict, Iterable, List, Optional
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> class Record(typing.NamedTuple):
>     port: Optional[int]
>     #port: str
> recFields=set([i for i in Record.__dict__.keys() if i[:1] != '_'])
> beam.coders.registry.register_coder(Record,beam.coders.RowCoder)
> def extractDF(tuple):
>   df=tuple[1].to_pandas()
>   print(type(df.port.dtype))
>   return df
> input_patterns = ['data/*.parquet']
> #local runner
> options = PipelineOptions(flags=[], type_check_additional='all')
>  
> def toRecords(df):
>     #df["port"]=None
>     return df.to_dict('records')
> with beam.Pipeline(options=options) as pipeline:
>       lines = (pipeline | 'Create file patterns' >> beam.Create(input_patterns)
>       | 'Read Parquet files' >>  beam.io.ReadAllFromParquetBatched(columns=recFields,with_filename=True)
>       | 'Extract DF' >> beam.Map(extractDF )
>       | 'To dictionaries' >> beam.FlatMap(toRecords)
>       |  'ToRows' >> beam.Map(lambda x: Record(**x)).with_output_types(Record)
>       | "print">> beam.Map(print)){code}
> This fails with an type error. 
> When I uncomment the line in toRecords to set everything to None it works fine. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)