You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Presley <ti...@foxmail.com> on 2021/02/22 03:17:27 UTC

回复:Re:SqlValidatorException: No match found for function signature prod()

这里实现的是自定义聚合函数,而不是标量函数鸭_(:з」∠)_




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <chenyegit@163.com&gt;;
发送时间:&nbsp;2021年2月22日(星期一) 中午11:14
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re:Re:SqlValidatorException: No match found for function signature prod(<NUMERIC&gt;)



应该是继承scalaFunction ?

















在 2021-02-22 10:25:31,"xiaoyue" <18242988825@163.com&gt; 写道:
&gt;捞一下自己,在线等大佬们的回复 _(:з」∠)_
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;在 2021-02-20 13:14:18,"xiaoyue" <18242988825@163.com&gt; 写道:
&gt;
&gt;我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function signature prod(<NUMERIC&gt;),请求大佬帮忙看看_(:з」∠)_
&gt;
&gt;以下是代码:
&gt;-----------------------------------------------------
&gt;...
&gt;&nbsp; stableEnv.createTemporarySystemFunction("prod", ProductAggregateFunction.class);
&gt;&nbsp; Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as yldrate from queryData group by pf_id");
&gt;...
&gt;-----------------------------------------------------
&gt;@FunctionHint(
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; input = @DataTypeHint("Double"),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; output = @DataTypeHint("Double")
&gt;)
&gt;public class ProductAggregateFunction extends AggregateFunction<Double, Product&gt; {
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp; public Double getValue(Product acc) {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return acc.prod;
&gt;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp; public Product createAccumulator() {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new Product();
&gt;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp; public void accumulate(Product acc, Double iValue) {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; acc.prod *= iValue;
&gt;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp; public void retract(Product acc, Double iValue) {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; acc.prod /= iValue;
&gt;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp; public void merge(Product acc, Iterable<Product&gt; it) {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; for (Product p : it) {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulate(acc, p.prod);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp; public void resetAccumulator(Product acc) {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; acc.prod = 1D;
&gt;&nbsp;&nbsp;&nbsp; }
&gt;}
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;