You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Evan <ch...@foxmail.com> on 2021/01/18 07:06:43 UTC

回复: flink sql hop函数使用udaf,跟踪源码对merge方法进行参数校验时问题

你把你的代码关键信息打码后贴出来让大家看看,现有信息看不出来,不要粘贴图片,图片看不到

祝好!


 
发件人: bigdata
发送时间: 2021-01-18 14:52
收件人: user-zh
主题: flink sql hop函数使用udaf,跟踪源码对merge方法进行参数校验时问题
你好:
    flink1.10.1 sql在使用hop后并将udaf中merge方法名修改正确后,报如下错:为什么会说找不到匹配的merge方法呢
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
跟踪merge 参数校验源码思路:
1、ImperativeAggCodeGen类的checkNeededMethods方法中if (needMerge)   getUserDefinedMethod 2、UserDefinedFunctionUtils类的getUserDefinedMethod方法中,当校验merge第二个参数时(代码详见下面merge)为false,进而导致报错,如何解决?还是说这是flink的一个bug
parameterClassEquals(methodSignature(i), clazz) ||
    parameterDataTypeEquals(internalTypes(i), dataTypes(i))


代码如下:
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = {
  its.foreach(i => accumulator ++ i)
}

回复:回复: flink sql hop函数使用udaf,跟踪源码对merge方法进行参数校验时问题

Posted by bigdata <11...@qq.com>.
代码如下:
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //各分位值
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
 
  override def getValue(accumulator: ListBuffer[Float]): String = {
    //窗口时间数据大小
    val length = accumulator.size
    var i1 = Math.round(length*percentile1).toInt
    if(i1==0) i1 = 1
    var i2 = Math.round(length*percentile2).toInt
    if(i2==0) i2 = 1
    var i3 = Math.round(length*percentile3).toInt
    if(i3==0) i3 = 1
    var i4 = Math.round(length*percentile4).toInt
    if(i4==0) i4 = 1
    val seq = accumulator.sorted
    //返回结果
    seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }

  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()

  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
    accumulator.append(i)
  }

  def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = {
    its.foreach(i =&gt; accumulator ++ i)
  }





------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <chengyanan1008@foxmail.com&gt;;
发送时间:&nbsp;2021年1月18日(星期一) 下午3:06
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: flink sql hop函数使用udaf,跟踪源码对merge方法进行参数校验时问题



你把你的代码关键信息打码后贴出来让大家看看,现有信息看不出来,不要粘贴图片,图片看不到

祝好!


&nbsp;
发件人: bigdata
发送时间: 2021-01-18 14:52
收件人: user-zh
主题: flink sql hop函数使用udaf,跟踪源码对merge方法进行参数校验时问题
你好:
&nbsp;&nbsp;&nbsp; flink1.10.1 sql在使用hop后并将udaf中merge方法名修改正确后,报如下错:为什么会说找不到匹配的merge方法呢
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
跟踪merge 参数校验源码思路:
1、ImperativeAggCodeGen类的checkNeededMethods方法中if (needMerge)&nbsp;&nbsp; getUserDefinedMethod 2、UserDefinedFunctionUtils类的getUserDefinedMethod方法中,当校验merge第二个参数时(代码详见下面merge)为false,进而导致报错,如何解决?还是说这是flink的一个bug
parameterClassEquals(methodSignature(i), clazz) ||
&nbsp;&nbsp;&nbsp; parameterDataTypeEquals(internalTypes(i), dataTypes(i))


代码如下:
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): Unit = {
&nbsp; its.foreach(i =&gt; accumulator ++ i)
}