You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2019/01/31 07:30:00 UTC

[jira] [Updated] (BEAM-6566) SqlTransform does not work for beam version above 2.6.0

     [ https://issues.apache.org/jira/browse/BEAM-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-6566:
----------------------------------
    Component/s:     (was: beam-model)
                 dsl-sql

> SqlTransform does not work for beam version above 2.6.0
> -------------------------------------------------------
>
>                 Key: BEAM-6566
>                 URL: https://issues.apache.org/jira/browse/BEAM-6566
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>    Affects Versions: 2.7.0, 2.8.0, 2.9.0
>            Reporter: Xuefeng Zhang
>            Assignee: Kenneth Knowles
>            Priority: Critical
>
> *Issue*:
> Beam versions above 2.6.0 do not work for SqlTransform. By looking at the code, those versions use PCollection.getSchema, this function never works even for 2.6.0
> *Details:*
> Beam 2.6.0, class BeamPcollectionTable which is used by SqlTransform:
> public BeamPCollectionTable(PCollection<Row> upstream) \{   super(((RowCoder) upstream.getCoder()).getSchema());   this.upstream = upstream; }
> But for Beam 2.7.0 and 2.8.0, it is changed to : https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
> So that got below errors after upgrading beam version from 2.6.0 to 2.9.0
> java.lang.IllegalStateException: Cannot call getSchema when there is no schema
> at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
>  at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34)
>  at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:111)
>  at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:91)
>  at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76)
>  at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>  at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>  at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
> *Codes:*
> Schema schema = Schema.builder()
> .addStringField("weightMarketValue")
> .addStringField("ticker")
> .addStringField("ratingLongTermFitchRaw")
> .build();
> Row row = Row.withSchema(schema)
> .addValues("weightMarketValue 1", "ticker 1", "ratingLongTermFitchRaw 1")
> .build();
> Version 1:
>  
> {color:#FF0000}PCollection<Row> input = p.apply(Create.of(row){color}
> {color:#FF0000} .withSchema(schema, SerializableFunctions.identity(), SerializableFunctions.identity()){color}
> {color:#FF0000} .withCoder(RowCoder.of(schema)));{color}
> PCollection<Row> output  = input.apply(SqlTransform.query("select * from PCOLLECTION"));
> Version 2:
> {color:#FF0000}PCollection<Row> input = p.apply(Create.of(row){color}
> {color:#FF0000}.withRowSchema(schema){color}
> {color:#FF0000}.withCoder(RowCoder.of(schema)));{color}
> PCollection<Row> output  = input.apply(SqlTransform.query("select * from PCOLLECTION"));
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)