You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2017/08/31 16:53:15 UTC
Help with table UDF
Hi all,
I'm using Flink 1.3.1 and I'm trying to register an UDF but there's
something wrong.
I always get the following exception:
java.lang.UnsupportedOperationException:
org.apache.flink.table.expressions.TableFunctionCall cannot be transformed
to RexNode
at
org.apache.flink.table.expressions.Expression.toRexNode(Expression.scala:53)
at
org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:79)
at
org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at
org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.table.plan.logical.Project.construct(operators.scala:94)
at
org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77)
at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
at
-------------------------------------------------
This is my Program:
final ExecutionEnvironment env =
DatalinksExecutionEnvironment.getExecutionEnv();
final BatchTableEnvironment tEnv =
TableEnvironment.getTableEnvironment(env);
DataSet<Row> dataSet = null;
dataSet = env.fromElements("{\"test\":\"val\"}").map(new
MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
Row ret = new Row(1);
ret.setField(0, value);
return ret;
}
}).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
tEnv.registerFunction("myFunc", new MyTableFunction());
Table test = tEnv.fromDataSet(dataSet, "field1");
Table res = test.select("field1,myFunc(recon)");
dataSet = tEnv.toDataSet(res,
new RowTypeInfo(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO));
dataSet.print();
MyTableFunction is something like:
public class MyTableFunction extends TableFunction<String> {
public String eval(String str) {
return "XXX";
}
}
What I'm doing wrong here?
Thanks in advance,
Flavio
Re: Help with table UDF
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,
you're using the TableFunction not correctly. The documentation shows how
to call it in a join() method.
But I agree, the error message should be better.
Best, Fabian
2017-08-31 18:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
> Hi all,
> I'm using Flink 1.3.1 and I'm trying to register an UDF but there's
> something wrong.
> I always get the following exception:
>
> java.lang.UnsupportedOperationException: org.apache.flink.table.expressions.TableFunctionCall
> cannot be transformed to RexNode
> at org.apache.flink.table.expressions.Expression.
> toRexNode(Expression.scala:53)
> at org.apache.flink.table.expressions.Alias.toRexNode(
> fieldExpression.scala:79)
> at org.apache.flink.table.plan.logical.Project$$anonfun$
> construct$1.apply(operators.scala:94)
> at org.apache.flink.table.plan.logical.Project$$anonfun$
> construct$1.apply(operators.scala:94)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:32)
> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.flink.table.plan.logical.Project.construct(
> operators.scala:94)
> at org.apache.flink.table.plan.logical.LogicalNode.toRelNode(
> LogicalNode.scala:77)
> at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
> at
>
> -------------------------------------------------
> This is my Program:
>
> final ExecutionEnvironment env = DatalinksExecutionEnvironment.
> getExecutionEnv();
> final BatchTableEnvironment tEnv = TableEnvironment.
> getTableEnvironment(env);
>
> DataSet<Row> dataSet = null;
> dataSet = env.fromElements("{\"test\":\"val\"}").map(new
> MapFunction<String, Row>() {
>
> @Override
> public Row map(String value) throws Exception {
> Row ret = new Row(1);
> ret.setField(0, value);
> return ret;
> }
> }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
>
> tEnv.registerFunction("myFunc", new MyTableFunction());
> Table test = tEnv.fromDataSet(dataSet, "field1");
> Table res = test.select("field1,myFunc(recon)");
> dataSet = tEnv.toDataSet(res,
> new RowTypeInfo(
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO));
> dataSet.print();
>
>
> MyTableFunction is something like:
>
> public class MyTableFunction extends TableFunction<String> {
> public String eval(String str) {
> return "XXX";
> }
> }
>
>
> What I'm doing wrong here?
>
> Thanks in advance,
> Flavio
>