You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/02/21 17:17:01 UTC

[jira] [Updated] (BEAM-11513) Unable to load Pandas dataframe from BigQuery

     [ https://issues.apache.org/jira/browse/BEAM-11513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-11513:
---------------------------------
    Labels: stale-P2  (was: )

> Unable to load Pandas dataframe from BigQuery
> ---------------------------------------------
>
>                 Key: BEAM-11513
>                 URL: https://issues.apache.org/jira/browse/BEAM-11513
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>    Affects Versions: 2.26.0
>            Reporter: Valliappa Lakshmanan
>            Priority: P2
>              Labels: stale-P2
>
> Doing this:
> {code:java}
> query = """
> SELECT  
>   airline,
>   departure_airport,
>   arrival_airport,
>   departure_delay,
>   arrival_delay
> FROM `bigquery-samples.airline_ontime_data.flights`
> """
> with beam.Pipeline() as p:
>     tbl = p | 'read table' >> beam.io.ReadFromBigQuery(query=query)
>     tbl = tbl | 'assign ts' >> beam.Map(
>         lambda x: beam.window.TimestampedValue(x, to_unixtime(x['date'])))
>     daily = tbl | 'daily windows' >> beam.WindowInto(beam.window.FixedWindows(60*60*24))
>     df = to_dataframe(daily)
>     result = df.groupby('airline').apply(get_delay_at_top_airports)
>     result.to_csv('output.csv'){code}
> returns this error:
> {code:java}
> ---------------------------------------------------------------------------
> TypeError                                 Traceback (most recent call last)
> <ipython-input-41-f47b2674428f> in <module>
>      13         lambda x: beam.window.TimestampedValue(x, to_unixtime(x['date'])))
>      14     daily = tbl | 'daily windows' >> beam.WindowInto(beam.window.FixedWindows(60*60*24))
> ---> 15     df = to_dataframe(daily)
>      16     result = df.groupby('airline').apply(get_delay_at_top_airports)
>      17     result.to_csv('output.csv')
> /opt/conda/lib/python3.7/site-packages/apache_beam/dataframe/convert.py in to_dataframe(pcoll, proxy, label)
>      69       # the name of these variables in the calling context.
>      70       label = 'BatchElements(%s)' % _var_name(pcoll, 2)
> ---> 71     proxy = schemas.generate_proxy(pcoll.element_type)
>      72     pcoll = pcoll | label >> schemas.BatchRowsAsDataFrame(proxy=proxy)
>      73   return frame_base.DeferredFrame.wrap(
> /opt/conda/lib/python3.7/site-packages/apache_beam/dataframe/schemas.py in generate_proxy(element_type)
>     178 
>     179   else:
> --> 180     fields = named_fields_from_element_type(element_type)
>     181     proxy = pd.DataFrame(columns=[name for name, _ in fields])
>     182     for name, typehint in fields:
> /opt/conda/lib/python3.7/site-packages/apache_beam/typehints/schemas.py in named_fields_from_element_type(element_type)
>     298 def named_fields_from_element_type(
>     299     element_type):  # (type) -> typing.List[typing.Tuple[unicode, type]]
> --> 300   return named_fields_from_schema(schema_from_element_type(element_type))
>     301 
>     302 
> /opt/conda/lib/python3.7/site-packages/apache_beam/typehints/schemas.py in schema_from_element_type(element_type)
>     293     raise TypeError(
>     294         "Attempted to determine schema for unsupported type '%s'" %
> --> 295         element_type)
>     296 
>     297 
> TypeError: Attempted to determine schema for unsupported type 'Any'{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)