You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Cody Innowhere <e....@gmail.com> on 2016/06/22 12:58:49 UTC
UDF in Flink table API
Hi guys,
I'm trying to add a UDF in Flink table API, say, in DataSet table API.
My example code is as follows:
---------------------------
object WordCountTable {
case class WC(word: String, num: Int)
def main(args: Array[String]): Unit = {
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val input = env.fromElements(WC("hello", 1), WC("hello", 2), WC("ciao",
3))
val table = input.toTable(tEnv)
tEnv.registerTable("WC", table)
//val funcCatalog = tblEnv.getFunctionCatalog
//funcCatalog.registerFunction("MyAdd1", classOf[MyAdds.MyAdd1])
val schema = tEnv.getFrameworkConfig.getDefaultSchema
schema.add("MyAdd1", ScalarFunctionImpl.create(classOf[MyAdds.MyAdd1],
"eval"))
tEnv.sql("SELECT word, MyAdd1(num) as num FROM WC WHERE num >
1").toDataSet[WC].print()
}
}
And MyAdds looks like this:
---------------------------
public class MyAdds {
public static class MyAdd1 {
public int eval(int x) {
return x + 1;
}
}
}
But when running, it gives the error:
Exception in thread "main" org.apache.calcite.tools.ValidationException:
org.apache.calcite.runtime.CalciteContextException: From line 1, column 14
to line 1, column 24: No match found for function signature
MyAdd1(<NUMERIC>)
at
org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
at
org.apache.flink.api.table.BatchTableEnvironment.sql(BatchTableEnvironment.scala:130)
at
org.apache.flink.examples.scala.WordCountTable$.main(WordCountTable.scala:54)
at org.apache.flink.examples.scala.WordCountTable.main(WordCountTable.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
column 14 to line 1, column 24: No match found for function signature
MyAdd1(<NUMERIC>)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:765)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:753)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:3929)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1544)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:278)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:222)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:132)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:439)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3428)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:2966)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:86)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:845)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:831)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:208)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:807)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:523)
at
org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:95)
... 8 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match
found for function signature MyAdd1(<NUMERIC>)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:514)
... 36 more
I'm not sure if it's the right way to add UDF in table API, or am I missing
something? Thanks~
Re: UDF in Flink table API
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Cody,
actually, there is no functionality to add a UDF in the Table API / SQL.
This feature is definitely on the roadmap but not implemented, yet. The
related JIRA is FLINK-3097.
In fact, the FunctionCatalog and FrameworkConfig should not be publicly
accessible from the TableEnvironment, IMO.
Best, Fabian
2016-06-22 14:58 GMT+02:00 Cody Innowhere <e....@gmail.com>:
> Hi guys,
> I'm trying to add a UDF in Flink table API, say, in DataSet table API.
> My example code is as follows:
> ---------------------------
> object WordCountTable {
> case class WC(word: String, num: Int)
>
> def main(args: Array[String]): Unit = {
>
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> val input = env.fromElements(WC("hello", 1), WC("hello", 2), WC("ciao",
> 3))
> val table = input.toTable(tEnv)
>
> tEnv.registerTable("WC", table)
>
> //val funcCatalog = tblEnv.getFunctionCatalog
> //funcCatalog.registerFunction("MyAdd1", classOf[MyAdds.MyAdd1])
>
> val schema = tEnv.getFrameworkConfig.getDefaultSchema
> schema.add("MyAdd1", ScalarFunctionImpl.create(classOf[MyAdds.MyAdd1],
> "eval"))
>
> tEnv.sql("SELECT word, MyAdd1(num) as num FROM WC WHERE num >
> 1").toDataSet[WC].print()
> }
> }
>
> And MyAdds looks like this:
> ---------------------------
> public class MyAdds {
> public static class MyAdd1 {
> public int eval(int x) {
> return x + 1;
> }
> }
> }
>
> But when running, it gives the error:
> Exception in thread "main" org.apache.calcite.tools.ValidationException:
> org.apache.calcite.runtime.CalciteContextException: From line 1, column 14
> to line 1, column 24: No match found for function signature
> MyAdd1(<NUMERIC>)
> at
>
> org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99)
> at
>
> org.apache.flink.api.table.BatchTableEnvironment.sql(BatchTableEnvironment.scala:130)
> at
>
> org.apache.flink.examples.scala.WordCountTable$.main(WordCountTable.scala:54)
> at
> org.apache.flink.examples.scala.WordCountTable.main(WordCountTable.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
> column 14 to line 1, column 24: No match found for function signature
> MyAdd1(<NUMERIC>)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:765)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:753)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:3929)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1544)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:278)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:222)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:132)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:439)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3428)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:2966)
> at
>
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:86)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:845)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:831)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:208)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:807)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:523)
> at
>
> org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:95)
> ... 8 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match
> found for function signature MyAdd1(<NUMERIC>)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:514)
> ... 36 more
>
>
> I'm not sure if it's the right way to add UDF in table API, or am I missing
> something? Thanks~
>