You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by guanyq <dl...@163.com> on 2021/02/25 09:47:59 UTC

Flink1.12.0版本 Distinct Aggregation

附件是代码,按照官网写的demo。
不知道哪里有问题,麻烦帮忙看下。


root

 |-- orderId: STRING

 |-- userId: INT

 |-- money: INT

 |-- createTime: BIGINT

 |-- pt: TIMESTAMP(3) *PROCTIME*




17:17:11,935 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

Exception in thread "main" java.lang.RuntimeException: Unknown call expression: count(orderId)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

at org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:251)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:93)

at java.util.Optional.map(Optional.java:215)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:93)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:734)

at org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:129)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:540)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:541)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:153)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)

at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)

at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)

at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)

at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)

at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)

at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:250)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:298)

at com.guanyq.study.TableAPIAndSQL.TableAPI.Aggregations.DistinctAggregation3.main(DistinctAggregation3.java:68)