You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2017/08/31 16:53:15 UTC

Help with table UDF

Hi all,
I'm using Flink 1.3.1 and I'm trying to register an UDF but there's
something wrong.
I always get the following exception:

java.lang.UnsupportedOperationException:
org.apache.flink.table.expressions.TableFunctionCall cannot be transformed
to RexNode
at
org.apache.flink.table.expressions.Expression.toRexNode(Expression.scala:53)
at
org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:79)
at
org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at
org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.table.plan.logical.Project.construct(operators.scala:94)
at
org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77)
at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
at

-------------------------------------------------
This is my Program:

final ExecutionEnvironment env =
DatalinksExecutionEnvironment.getExecutionEnv();
final BatchTableEnvironment tEnv =
TableEnvironment.getTableEnvironment(env);

DataSet<Row> dataSet = null;
dataSet = env.fromElements("{\"test\":\"val\"}").map(new
MapFunction<String, Row>() {

        @Override
        public Row map(String value) throws Exception {
          Row ret = new Row(1);
          ret.setField(0, value);
          return ret;
        }
      }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));

tEnv.registerFunction("myFunc", new MyTableFunction());
Table test = tEnv.fromDataSet(dataSet, "field1");
 Table res = test.select("field1,myFunc(recon)");
 dataSet = tEnv.toDataSet(res,
           new RowTypeInfo(
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO));
  dataSet.print();


MyTableFunction is something like:

public class MyTableFunction extends TableFunction<String> {
  public String eval(String str) {
    return "XXX";
  }
}


What I'm doing wrong here?

Thanks in advance,
Flavio

Re: Help with table UDF

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,

you're using the TableFunction not correctly. The documentation shows how
to call it in a join() method.
But I agree, the error message should be better.

Best, Fabian


2017-08-31 18:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Hi all,
> I'm using Flink 1.3.1 and I'm trying to register an UDF but there's
> something wrong.
> I always get the following exception:
>
> java.lang.UnsupportedOperationException: org.apache.flink.table.expressions.TableFunctionCall
> cannot be transformed to RexNode
> at org.apache.flink.table.expressions.Expression.
> toRexNode(Expression.scala:53)
> at org.apache.flink.table.expressions.Alias.toRexNode(
> fieldExpression.scala:79)
> at org.apache.flink.table.plan.logical.Project$$anonfun$
> construct$1.apply(operators.scala:94)
> at org.apache.flink.table.plan.logical.Project$$anonfun$
> construct$1.apply(operators.scala:94)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:32)
> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.flink.table.plan.logical.Project.construct(
> operators.scala:94)
> at org.apache.flink.table.plan.logical.LogicalNode.toRelNode(
> LogicalNode.scala:77)
> at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
> at
>
> -------------------------------------------------
> This is my Program:
>
> final ExecutionEnvironment env = DatalinksExecutionEnvironment.
> getExecutionEnv();
> final BatchTableEnvironment tEnv = TableEnvironment.
> getTableEnvironment(env);
>
> DataSet<Row> dataSet = null;
> dataSet = env.fromElements("{\"test\":\"val\"}").map(new
> MapFunction<String, Row>() {
>
>         @Override
>         public Row map(String value) throws Exception {
>           Row ret = new Row(1);
>           ret.setField(0, value);
>           return ret;
>         }
>       }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
>
> tEnv.registerFunction("myFunc", new MyTableFunction());
> Table test = tEnv.fromDataSet(dataSet, "field1");
>  Table res = test.select("field1,myFunc(recon)");
>  dataSet = tEnv.toDataSet(res,
>            new RowTypeInfo(
>                    BasicTypeInfo.STRING_TYPE_INFO,
>                    BasicTypeInfo.STRING_TYPE_INFO));
>   dataSet.print();
>
>
> MyTableFunction is something like:
>
> public class MyTableFunction extends TableFunction<String> {
>   public String eval(String str) {
>     return "XXX";
>   }
> }
>
>
> What I'm doing wrong here?
>
> Thanks in advance,
> Flavio
>