You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Samuel Waggoner (JIRA)" <ji...@apache.org> on 2018/03/05 16:38:00 UTC

[jira] [Created] (BEAM-3777) Subclass of subclass of CombineFn does not work as UDAF

Samuel Waggoner created BEAM-3777:
-------------------------------------

             Summary: Subclass of subclass of CombineFn does not work as UDAF
                 Key: BEAM-3777
                 URL: https://issues.apache.org/jira/browse/BEAM-3777
             Project: Beam
          Issue Type: Bug
          Components: dsl-sql
            Reporter: Samuel Waggoner
            Assignee: Xu Mingmin


Our team has a use case where we want to use a subclass of a subclass of CombineFn as a UDAF. This is not working. I have created a minimal example in BeamSqlExample to demonstrate.

Given the following class definitions:
{code:java}
public static class SquareSumSub extends SquareSum {}{code}
{code:java}
public static class SquareSum extends Combine.CombineFn<Integer, Integer, Integer> {
 @Override
 public Integer createAccumulator() {
 return 0;
 }

 @Override
 public Integer addInput(Integer accumulator, Integer input) {
 return accumulator + input * input;
 }

 @Override
 public Integer mergeAccumulators(Iterable<Integer> accumulators) {
 int v = 0;
 Iterator<Integer> ite = accumulators.iterator();
 while (ite.hasNext()) {
 v += ite.next();
 }
 return v;
 }

 @Override
 public Integer extractOutput(Integer accumulator) {
 return accumulator;
 }

}{code}
I try to use SquareSumSub:
{code:java}
//Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
PCollection<Row> outputStream = inputTable.apply(
 BeamSql.query("select squaresum(c1) as c1, c2, c3 from PCOLLECTION Group By c2, c3")
 .registerUdaf("squaresum", new SquareSumSub()));{code}
I get the following exception:
{code:java}
Exception in thread "main" java.lang.IllegalStateException: org.apache.calcite.tools.ValidationException: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
 at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:76)
 at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:47)
 at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
 at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
 at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
 at org.apache.beam.sdk.extensions.sql.example.BeamSqlExample.main(BeamSqlExample.java:71)
Caused by: org.apache.calcite.tools.ValidationException: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
 at org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:195)
 at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.validateNode(BeamQueryPlanner.java:173)
 at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.validateAndConvert(BeamQueryPlanner.java:153)
 at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:144)
 at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:73)
 ... 5 more
Caused by: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
 at org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl$1.getType(UdafImpl.java:63)
 at org.apache.calcite.prepare.CalciteCatalogReader.toOp(CalciteCatalogReader.java:321)
 at org.apache.calcite.prepare.CalciteCatalogReader.access$000(CalciteCatalogReader.java:81)
 at org.apache.calcite.prepare.CalciteCatalogReader$3.apply(CalciteCatalogReader.java:312)
 at org.apache.calcite.prepare.CalciteCatalogReader$3.apply(CalciteCatalogReader.java:310)
 at com.google.common.collect.Iterators$7.transform(Iterators.java:750)
 at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
 at java.util.AbstractCollection.toArray(AbstractCollection.java:141)
 at java.util.ArrayList.addAll(ArrayList.java:577)
 at org.apache.calcite.prepare.CalciteCatalogReader.lookupOperatorOverloads(CalciteCatalogReader.java:308)
 at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:72)
 at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1132)
 at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1117)
 at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1146)
 at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1117)
 at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:896)
 at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:613)
 at org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:193)
 ... 9 more{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)