You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Anton Kedin (JIRA)" <ji...@apache.org> on 2018/03/06 03:55:00 UTC

[jira] [Closed] (BEAM-3777) Subclass of subclass of CombineFn does not work as UDAF

     [ https://issues.apache.org/jira/browse/BEAM-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Anton Kedin closed BEAM-3777.
-----------------------------
       Resolution: Fixed
    Fix Version/s: Not applicable

merged

> Subclass of subclass of CombineFn does not work as UDAF
> -------------------------------------------------------
>
>                 Key: BEAM-3777
>                 URL: https://issues.apache.org/jira/browse/BEAM-3777
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Samuel Waggoner
>            Assignee: Xu Mingmin
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> Our team has a use case where we want to use a subclass of a subclass of CombineFn as a UDAF. This is not working. I have created a minimal example in BeamSqlExample to demonstrate.
> Given the following class definitions:
> {code:java}
> public static class SquareSumSub extends SquareSum {}{code}
> {code:java}
> public static class SquareSum extends Combine.CombineFn<Integer, Integer, Integer> {
>  @Override
>  public Integer createAccumulator() {
>  return 0;
>  }
>  @Override
>  public Integer addInput(Integer accumulator, Integer input) {
>  return accumulator + input * input;
>  }
>  @Override
>  public Integer mergeAccumulators(Iterable<Integer> accumulators) {
>  int v = 0;
>  Iterator<Integer> ite = accumulators.iterator();
>  while (ite.hasNext()) {
>  v += ite.next();
>  }
>  return v;
>  }
>  @Override
>  public Integer extractOutput(Integer accumulator) {
>  return accumulator;
>  }
> }{code}
> I try to use SquareSumSub:
> {code:java}
> //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
> PCollection<Row> outputStream = inputTable.apply(
>  BeamSql.query("select squaresum(c1) as c1, c2, c3 from PCOLLECTION Group By c2, c3")
>  .registerUdaf("squaresum", new SquareSumSub()));{code}
> I get the following exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: org.apache.calcite.tools.ValidationException: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
>  at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:76)
>  at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:47)
>  at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>  at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
>  at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
>  at org.apache.beam.sdk.extensions.sql.example.BeamSqlExample.main(BeamSqlExample.java:71)
> Caused by: org.apache.calcite.tools.ValidationException: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
>  at org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:195)
>  at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.validateNode(BeamQueryPlanner.java:173)
>  at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.validateAndConvert(BeamQueryPlanner.java:153)
>  at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:144)
>  at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:73)
>  ... 5 more
> Caused by: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
>  at org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl$1.getType(UdafImpl.java:63)
>  at org.apache.calcite.prepare.CalciteCatalogReader.toOp(CalciteCatalogReader.java:321)
>  at org.apache.calcite.prepare.CalciteCatalogReader.access$000(CalciteCatalogReader.java:81)
>  at org.apache.calcite.prepare.CalciteCatalogReader$3.apply(CalciteCatalogReader.java:312)
>  at org.apache.calcite.prepare.CalciteCatalogReader$3.apply(CalciteCatalogReader.java:310)
>  at com.google.common.collect.Iterators$7.transform(Iterators.java:750)
>  at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>  at java.util.AbstractCollection.toArray(AbstractCollection.java:141)
>  at java.util.ArrayList.addAll(ArrayList.java:577)
>  at org.apache.calcite.prepare.CalciteCatalogReader.lookupOperatorOverloads(CalciteCatalogReader.java:308)
>  at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:72)
>  at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1132)
>  at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1117)
>  at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1146)
>  at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1117)
>  at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:896)
>  at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:613)
>  at org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:193)
>  ... 9 more{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)