You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/04/03 02:23:00 UTC

[jira] [Updated] (BEAM-10544) Select Types not equal with nested schema

     [ https://issues.apache.org/jira/browse/BEAM-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-10544:
-----------------------------------
    Status: Open  (was: Triage Needed)

> Select Types not equal with nested schema
> -----------------------------------------
>
>                 Key: BEAM-10544
>                 URL: https://issues.apache.org/jira/browse/BEAM-10544
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql, sdk-java-core
>            Reporter: Jacob Ferriero
>            Priority: P3
>              Labels: Clarified
>
> When using SQL transform to join a large nested schema to a  flat table getting an error about "Types not equal" from Select [1]
> We are not able the test of our use of SqlTransform to pass with direct runner. All code is checked into CSR [2].
> Things of note:
> Calcite Query Planner
> Query (the real business logic was much more complex but this is sufficient to reproduce issue in our test)
> ```sql
> SELECT
>     t1.DeviceName AS DeviceName,
>     t1.LinkName AS LinkName,
>     t1.HostName AS HostName,
>     t1.MeasuredAt AS MeasuredAt,
>     t2.b_dBm AS b_dBm
> FROM
>     RealtimeRows AS t1
>   INNER JOIN
>   --BigQuery Dimension Side Input
>     TxPowerSideInput AS t2
>   ON
>     t1.DeviceName = t2.DeviceName
> ```
> Tables created like so (though in real tive )
> ```java
>     // This table has the same schema to the real incoming Pub/Sub messages
>     // in the real world use case.
>     PCollection<Row> realtimeTestData = pipeline
>         .apply("Read 1Hz staging",
>             BigQueryIO
>                 .readTableRowsWithSchema()
>                 .fromQuery(
>                     "SELECT * FROM `taara-db.jake_views.staging_sample_float`")
>                 .usingStandardSql())
>         .apply(Convert.toRows());
>     PCollection<Row> txPowerCalcRows = pipeline
>         .apply("Read Tx Power Calc Side Input",
>             BigQueryIO
>                 .readTableRowsWithSchema()
>                 .fromQuery(
>                     "SELECT * FROM `taara-db`.MANUFACTURING.tx_power_timeinvariant_calculations")
>                 .usingStandardSql())
>         .apply(Convert.toRows());
> ```
> Relevant java snippet
> ```java 
>   PCollection<Row> out = tables
>         .apply(
>             "Join to dimension Data",
>             SqlTransform
>                 .query(sql)
>                 .registerUdf("POW", Pow.class)
>                 .registerUdf("SQRT", Sqrt.class)
>                 .registerUdf("LOG10", Log10.class)
>                 .registerUdf("GREATEST", Greatest.class)
>                 .registerUdf("EXTRACT_OFFSET", ExtractArrayOffset.class)
>                 .registerUdf("PARSE_TIMESTAMP", ParseTimestamp.class)
>                 .registerUdf("UNIX_SECONDS", UnixSeconds.class)
>         );
> ```
> [1] https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java#L203
> [2] https://source.cloud.google.com/taara-db/pso-taara-realtime-margin/+/master:streaming-join/streaming-join/src/test/java/com/google/x/taara/dataflow/transforms/RxTxPowersCorrFERCombinedSqlTransformIT.java



--
This message was sent by Atlassian Jira
(v8.3.4#803005)