You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Cody Innowhere <e....@gmail.com> on 2016/06/22 12:58:49 UTC

UDF in Flink table API

Hi guys,
I'm trying to add a UDF in Flink table API, say, in DataSet table API.
My example code is as follows:
---------------------------
object WordCountTable {
  case class WC(word: String, num: Int)

  def main(args: Array[String]): Unit = {

    // set up execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val input = env.fromElements(WC("hello", 1), WC("hello", 2), WC("ciao",
3))
    val table = input.toTable(tEnv)

    tEnv.registerTable("WC", table)

    //val funcCatalog = tblEnv.getFunctionCatalog
    //funcCatalog.registerFunction("MyAdd1", classOf[MyAdds.MyAdd1])

    val schema = tEnv.getFrameworkConfig.getDefaultSchema
    schema.add("MyAdd1", ScalarFunctionImpl.create(classOf[MyAdds.MyAdd1],
"eval"))

    tEnv.sql("SELECT word, MyAdd1(num) as num FROM WC WHERE num >
1").toDataSet[WC].print()
  }
}

And MyAdds looks like this:
---------------------------
public class MyAdds {
    public static class MyAdd1 {
        public int eval(int x) {
            return x + 1;
        }
    }
}

But when running, it gives the error:
Exception in thread "main" org.apache.calcite.tools.ValidationException:
org.apache.calcite.runtime.CalciteContextException: From line 1, column 14
to line 1, column 24: No match found for function signature
MyAdd1(<NUMERIC>)
at
org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
at
org.apache.flink.api.table.BatchTableEnvironment.sql(BatchTableEnvironment.scala:130)
at
org.apache.flink.examples.scala.WordCountTable$.main(WordCountTable.scala:54)
at org.apache.flink.examples.scala.WordCountTable.main(WordCountTable.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
column 14 to line 1, column 24: No match found for function signature
MyAdd1(<NUMERIC>)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:765)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:753)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:3929)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1544)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:278)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:222)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:132)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:439)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3428)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:2966)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:86)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:845)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:831)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:208)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:807)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:523)
at
org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:95)
... 8 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match
found for function signature MyAdd1(<NUMERIC>)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:514)
... 36 more


I'm not sure if it's the right way to add UDF in table API, or am I missing
something? Thanks~

Re: UDF in Flink table API

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Cody,

actually, there is no functionality to add a UDF in the Table API / SQL.
This feature is definitely on the roadmap but not implemented, yet. The
related JIRA is FLINK-3097.

In fact, the FunctionCatalog and FrameworkConfig should not be publicly
accessible from the TableEnvironment, IMO.

Best, Fabian

2016-06-22 14:58 GMT+02:00 Cody Innowhere <e....@gmail.com>:

> Hi guys,
> I'm trying to add a UDF in Flink table API, say, in DataSet table API.
> My example code is as follows:
> ---------------------------
> object WordCountTable {
>   case class WC(word: String, num: Int)
>
>   def main(args: Array[String]): Unit = {
>
>     // set up execution environment
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>
>     val input = env.fromElements(WC("hello", 1), WC("hello", 2), WC("ciao",
> 3))
>     val table = input.toTable(tEnv)
>
>     tEnv.registerTable("WC", table)
>
>     //val funcCatalog = tblEnv.getFunctionCatalog
>     //funcCatalog.registerFunction("MyAdd1", classOf[MyAdds.MyAdd1])
>
>     val schema = tEnv.getFrameworkConfig.getDefaultSchema
>     schema.add("MyAdd1", ScalarFunctionImpl.create(classOf[MyAdds.MyAdd1],
> "eval"))
>
>     tEnv.sql("SELECT word, MyAdd1(num) as num FROM WC WHERE num >
> 1").toDataSet[WC].print()
>   }
> }
>
> And MyAdds looks like this:
> ---------------------------
> public class MyAdds {
>     public static class MyAdd1 {
>         public int eval(int x) {
>             return x + 1;
>         }
>     }
> }
>
> But when running, it gives the error:
> Exception in thread "main" org.apache.calcite.tools.ValidationException:
> org.apache.calcite.runtime.CalciteContextException: From line 1, column 14
> to line 1, column 24: No match found for function signature
> MyAdd1(<NUMERIC>)
> at
>
> org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
> at
>
> org.apache.flink.api.table.BatchTableEnvironment.sql(BatchTableEnvironment.scala:130)
> at
>
> org.apache.flink.examples.scala.WordCountTable$.main(WordCountTable.scala:54)
> at
> org.apache.flink.examples.scala.WordCountTable.main(WordCountTable.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
> column 14 to line 1, column 24: No match found for function signature
> MyAdd1(<NUMERIC>)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:765)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:753)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:3929)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1544)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:278)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:222)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:132)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:439)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3428)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:2966)
> at
>
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:86)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:845)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:831)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:208)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:807)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:523)
> at
>
> org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:95)
> ... 8 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match
> found for function signature MyAdd1(<NUMERIC>)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:514)
> ... 36 more
>
>
> I'm not sure if it's the right way to add UDF in table API, or am I missing
> something? Thanks~
>