You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Sonam Ramchand <so...@venturedive.com> on 2021/02/09 18:22:37 UTC

COVAR_POP aggregate function test for the ZetaSql dialec

Hi Devs,
I am trying to test the COVAR_POP aggregate function for the ZetaSql
dialect. I see
https://github.com/apache/beam/blob/b74fcf7b30d956fb42830d652a57b265a1546973/sdks/[…]he/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java
<https://github.com/apache/beam/blob/b74fcf7b30d956fb42830d652a57b265a1546973/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java#L50>
is
implemented as CombineFn and it works correctly
https://github.com/apache/beam/blob/befcc3d780d561e81f23512742862a65c0ae3b69/sdks/[…]eam/sdk/extensions/sql/BeamSqlDslAggregationCovarianceTest.java
<https://github.com/apache/beam/blob/befcc3d780d561e81f23512742862a65c0ae3b69/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationCovarianceTest.java#L87>
.However, for ZetaSql dialect, it throws:
covar_pop has more than one argument.
java.lang.IllegalArgumentException: covar_pop has more than one argument.Unit
test:

public void testZetaSqlCovarPop() {
  String sql = "SELECT COVAR_POP(row_id,int64_col) FROM
table_all_types  GROUP BY bool_col";

  ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
  BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
  PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);

  final Schema schema = Schema.builder().addDoubleField("field1").build();
  PAssert.that(stream)
      .containsInAnyOrder(
          Row.withSchema(schema).addValue(-1.00000).build(),
          Row.withSchema(schema).addValue(-1.55556).build());

  pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

Can anybody help me in understanding the cause of this problem? I do not
understand how it works correctly in other places and not in ZetaSqlDialect.
For reference: https://github.com/apache/beam/pull/13915

I would really appreciate any sort of input on this.
-- 

Regards,
*Sonam*
Software Engineer
Mobile: +92 3088337296

<http://venturedive.com/>

Re: COVAR_POP aggregate function test for the ZetaSql dialec

Posted by Andrew Pilloud <ap...@google.com>.
Looks like the code that converts the parsed ZetaSQL to a Calcite logical
expression doesn't currently support aggregate functions with multiple
columns. See this TODO:
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java#L147

On Tue, Feb 9, 2021 at 10:22 AM Sonam Ramchand <
sonam.ramchand@venturedive.com> wrote:

> Hi Devs,
> I am trying to test the COVAR_POP aggregate function for the ZetaSql
> dialect. I see
> https://github.com/apache/beam/blob/b74fcf7b30d956fb42830d652a57b265a1546973/sdks/[…]he/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java
> <https://github.com/apache/beam/blob/b74fcf7b30d956fb42830d652a57b265a1546973/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java#L50> is
> implemented as CombineFn and it works correctly
> https://github.com/apache/beam/blob/befcc3d780d561e81f23512742862a65c0ae3b69/sdks/[…]eam/sdk/extensions/sql/BeamSqlDslAggregationCovarianceTest.java
> <https://github.com/apache/beam/blob/befcc3d780d561e81f23512742862a65c0ae3b69/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationCovarianceTest.java#L87>
> .However, for ZetaSql dialect, it throws:
> covar_pop has more than one argument.
> java.lang.IllegalArgumentException: covar_pop has more than one argument.Unit
> test:
>
> public void testZetaSqlCovarPop() {
>   String sql = "SELECT COVAR_POP(row_id,int64_col) FROM table_all_types  GROUP BY bool_col";
>
>   ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
>   BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
>   PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
>
>   final Schema schema = Schema.builder().addDoubleField("field1").build();
>   PAssert.that(stream)
>       .containsInAnyOrder(
>           Row.withSchema(schema).addValue(-1.00000).build(),
>           Row.withSchema(schema).addValue(-1.55556).build());
>
>   pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
> }
>
> Can anybody help me in understanding the cause of this problem? I do not
> understand how it works correctly in other places and not in ZetaSqlDialect.
> For reference: https://github.com/apache/beam/pull/13915
>
> I would really appreciate any sort of input on this.
> --
>
> Regards,
> *Sonam*
> Software Engineer
> Mobile: +92 3088337296 <+92%20308%208337296>
>
> <http://venturedive.com/>
>