You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Zhiheng Huang <sy...@gmail.com> on 2021/03/02 19:18:53 UTC

Re: Select one field from an array of nested rows in beam SQL

I see. Thanks for the info!

On Thu, Feb 25, 2021 at 6:11 PM Andrew Pilloud <ap...@google.com> wrote:

> There is some recent work to improve unnest
> <https://github.com/apache/beam/pull/12843> that went into Beam 2.25.0+,
> it might cover your use case. It looks like we have no support for
> the Collect operator, which is your problem here. I verified 'SELECT
> ARRAY(SELECT f_int FROM PCOLLECTION)' doesn't work and filed
> https://issues.apache.org/jira/browse/BEAM-11872
>
> For the UDF side of things, we haven't put much work into making nested
> rows work well with UDFs. WrappedRow is intended to be an internal wrapper
> for BeamCalcRel, we should probably be passing a schema Row, which gives
> you access to fields by name. I filed a JIRA for this:
> https://issues.apache.org/jira/browse/BEAM-11871
>
> Andrew
>
> On Wed, Feb 24, 2021 at 10:33 PM Zhiheng Huang <sy...@gmail.com>
> wrote:
>
>> Hi beam users,
>>
>> We have a use case where we have a schema such as:
>>
>> Schema.of(
>>     Field.of("array_of_nested_rows",
>>              FieldType.array(FieldType.row(
>>                  Schema.of(Field.of("row_field1", FieldType.INT32))))),
>>     Field.of("otherScalarField", FieldType.STRING)
>> )
>>
>> We would like to select "array_of_nested_rows.row_field1" as a list of
>> ints together with "otherScalarField" as the output. For example, in
>> BigQuery we can achieve this with:
>>
>> SELECT
>>   otherScalarField,
>>   ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows)
>> FROM
>>   table
>>
>> Trying this query with beam SQL yields:
>>
>> Unable to convert query select array(select score from
>> UNNEST(Yt8mAnnotation)) from PCOLLECTION
>> org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
>> convert query select array(select score from UNNEST(Yt8mAnnotation)) from
>> PCOLLECTION
>> at
>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:181)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:109)
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:135)
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
>> ...
>>
>> Caused by:
>> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
>> There are not enough rules to produce a node with desired properties:
>> convention=BEAM_LOGICAL.
>> Missing conversion is LogicalCorrelate[convention: NONE -> BEAM_LOGICAL]
>> There is 1 empty subset: rel#63:Subset#6.BEAM_LOGICAL, the relevant part
>> of the original plan is as follows
>> 56:LogicalCorrelate(correlation=[$cor0], joinType=[inner],
>> requiredColumns=[{0}])
>>   8:BeamIOSourceRel(subset=[rel#46:Subset#0.BEAM_LOGICAL], table=[[beam,
>> PCOLLECTION]])
>>   54:Collect(subset=[rel#55:Subset#5.NONE], field=[EXPR$0])
>>     52:LogicalProject(subset=[rel#53:Subset#4.NONE], score=[$2])
>>       50:Uncollect(subset=[rel#51:Subset#3.NONE])
>> ...
>>
>> We have also tried to define a UDF that takes in array_of_nested_rows.
>> This doesn't work out either because the input param passed into the UDF
>> eval function is a list of WrappedRow
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L642>,
>> which doesn't allow us to query field value with its name. It only supports
>> getting the field value given an index. This is useless for us since we do
>> not know how to get the row field schema in the eval function.
>>
>>  Do you have any suggestions about how to achieve this? We are using beam
>> 2.22.0
>>
>> Thanks a lot!
>>
>

-- 
Sylvon Huang