You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2019/04/18 10:12:00 UTC

[jira] [Commented] (FLINK-12114) Try to perform UDF By Reflection

    [ https://issues.apache.org/jira/browse/FLINK-12114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16820937#comment-16820937 ] 

Timo Walther commented on FLINK-12114:
--------------------------------------

[~jiaqiang] I saw this type of exception in the past. It looks like a classloading issue. You need to make sure that all resources are available in Flink usercode classloader.

> Try to perform UDF By Reflection
> --------------------------------
>
>                 Key: FLINK-12114
>                 URL: https://issues.apache.org/jira/browse/FLINK-12114
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / API
>    Affects Versions: 1.6.5
>            Reporter: dong
>            Priority: Major
>
> Hi Team:
>      I recently worked on a Flink SQL-based project and then one of the requirements was to dynamically execute the Flink SQL UDF by loading a user-defined UDF. I first used cglib to dynamically add the eval method to the ScalarFunction and then dynamically create the ScalarFunction instance. And call the user-defined UDF by reflection on the line.
> this my code
> {code:java}
> package com.ximalaya.flink.dsl.stream.udf
> import java.lang.reflect.Method
> import com.ximalaya.flink.dsl.stream.`type`.FieldType
> import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, UserUdfContext}
> import net.sf.cglib.core.Signature
> import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, MethodProxy}
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils
> import net.sf.cglib.asm.Type
> import scala.collection.mutable.\{Map ⇒ IMap}
> /**
>  *
>  * @author martin.dong
>  *
>  **/
> private class UserUdfFunction extends ScalarFunction{
>  override def isDeterministic: Boolean = false
> }
> private class UdfMethodInterceptor(val name:String,
>  val fullyQualifiedClassName:String) extends MethodInterceptor with Serializable {
>  private var userUdf: AbstractUserUdf = _
>  private var evalMethods:IMap[MethodSignature,Method]=IMap()
>  private var closeMethod:Method = _
>  private var openMethod:Method = _
>  override def intercept(o: scala.Any,
>  method: Method,
>  objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = {
>  val methodName=method.getName
>  methodName match {
>  case "open"⇒
>  this.userUdf = Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf]
>  this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval").
>  foreach(method ⇒ evalMethods.put(MethodSignature.createMethodSignature(method), method))
>  this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close")
>  this.openMethod = classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext])
>  openMethod.invoke(userUdf,null)
>  case "eval"⇒
>  val methodSignature = MethodSignature.createMethodSignature(method)
>  evalMethods(methodSignature).invoke(userUdf,objects:_*)
>  case "close"⇒
>  closeMethod.invoke(userUdf)
>  case _⇒
>  methodProxy.invokeSuper(o,objects)
>  }
>  }
> }
> private class MethodSignature (val fieldTypes:Array[FieldType]){
>  def this(clazzArray:Array[Class[_]]){
>  this(clazzArray.map(clazz⇒FieldType.get(clazz)))
>  }
>  override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum
>  override def equals(obj: scala.Any): Boolean = {
>  if(this.eq(obj.asInstanceOf[AnyRef])){
>  return true
>  }
>  obj match {
>  case _: MethodSignature⇒ ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes)
>  case _ ⇒ false
>  }
>  }
>  override def toString: String =fieldTypes.map(_.toString).mkString(",")
> }
> private object MethodSignature{
>  def createMethodSignature(method:Method):MethodSignature={
>  new MethodSignature(method.getParameterTypes)
>  }
> }
> case class EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]])
> object UserUdfFactory {
>  def createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={
>  val enhancer = new Enhancer
>  enhancer.setSuperclass(classOf[UserUdfFunction])
>  enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName))
>  enhancer.setInterfaces(evalMethods.map(method⇒{
>  val returnType=Type.getType(method.returnType.getClazz)
>  val parameters=method.parameters.map(p⇒Type.getType(p.getClazz))
>  (new Signature("eval",returnType,parameters),method.exceptions)
>  }).map{ case(signature,exceptions)⇒
>  val im = new InterfaceMaker
>  im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray)
>  im.create()
>  }.toArray)
>  enhancer.create().asInstanceOf[ScalarFunction]
>  }
> }
> {code}
> Can be executed in local mode but cannot be executed in yarn mode, the following error will occur
> {code:java}
> Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: Cannot determine simple type name "com"
>         at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
>         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
>         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
>         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
>         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
>         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
>         at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156)
>         at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212)
>         at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064)
>         at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059)
>         at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754)
>         at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059)
>         at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052)
>         at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753)
>         at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
>         at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212)
>         at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844)
>         at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456)
>         at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212)
>         at org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082)
>         at org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077)
>         at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136)
>         at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077)
>         at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073)
>         at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
>         at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
>         at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
>         at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
>         at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
>         at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438)
>         at org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212)
>         at org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
>         at org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)
>         at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050)
>         at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077)
>         at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073)
>         at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
>         at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
>         at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
>         at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
> {code}
> anyone can help me?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)