You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Anton Kedin (JIRA)" <ji...@apache.org> on 2018/03/06 03:55:00 UTC
[jira] [Closed] (BEAM-3777) Subclass of subclass of CombineFn does
not work as UDAF
[ https://issues.apache.org/jira/browse/BEAM-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anton Kedin closed BEAM-3777.
-----------------------------
Resolution: Fixed
Fix Version/s: Not applicable
merged
> Subclass of subclass of CombineFn does not work as UDAF
> -------------------------------------------------------
>
> Key: BEAM-3777
> URL: https://issues.apache.org/jira/browse/BEAM-3777
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql
> Reporter: Samuel Waggoner
> Assignee: Xu Mingmin
> Priority: Major
> Fix For: Not applicable
>
> Time Spent: 5h
> Remaining Estimate: 0h
>
> Our team has a use case where we want to use a subclass of a subclass of CombineFn as a UDAF. This is not working. I have created a minimal example in BeamSqlExample to demonstrate.
> Given the following class definitions:
> {code:java}
> public static class SquareSumSub extends SquareSum {}{code}
> {code:java}
> public static class SquareSum extends Combine.CombineFn<Integer, Integer, Integer> {
> @Override
> public Integer createAccumulator() {
> return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
> return accumulator + input * input;
> }
> @Override
> public Integer mergeAccumulators(Iterable<Integer> accumulators) {
> int v = 0;
> Iterator<Integer> ite = accumulators.iterator();
> while (ite.hasNext()) {
> v += ite.next();
> }
> return v;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
> return accumulator;
> }
> }{code}
> I try to use SquareSumSub:
> {code:java}
> //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
> PCollection<Row> outputStream = inputTable.apply(
> BeamSql.query("select squaresum(c1) as c1, c2, c3 from PCOLLECTION Group By c2, c3")
> .registerUdaf("squaresum", new SquareSumSub()));{code}
> I get the following exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: org.apache.calcite.tools.ValidationException: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
> at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:76)
> at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:47)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
> at org.apache.beam.sdk.extensions.sql.example.BeamSqlExample.main(BeamSqlExample.java:71)
> Caused by: org.apache.calcite.tools.ValidationException: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
> at org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:195)
> at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.validateNode(BeamQueryPlanner.java:173)
> at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.validateAndConvert(BeamQueryPlanner.java:153)
> at org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:144)
> at org.apache.beam.sdk.extensions.sql.QueryTransform.expand(QueryTransform.java:73)
> ... 5 more
> Caused by: java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
> at org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl$1.getType(UdafImpl.java:63)
> at org.apache.calcite.prepare.CalciteCatalogReader.toOp(CalciteCatalogReader.java:321)
> at org.apache.calcite.prepare.CalciteCatalogReader.access$000(CalciteCatalogReader.java:81)
> at org.apache.calcite.prepare.CalciteCatalogReader$3.apply(CalciteCatalogReader.java:312)
> at org.apache.calcite.prepare.CalciteCatalogReader$3.apply(CalciteCatalogReader.java:310)
> at com.google.common.collect.Iterators$7.transform(Iterators.java:750)
> at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
> at java.util.AbstractCollection.toArray(AbstractCollection.java:141)
> at java.util.ArrayList.addAll(ArrayList.java:577)
> at org.apache.calcite.prepare.CalciteCatalogReader.lookupOperatorOverloads(CalciteCatalogReader.java:308)
> at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:72)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1132)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1117)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1146)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1117)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:896)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:613)
> at org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:193)
> ... 9 more{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)