You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by liu_mingzhang <li...@163.com> on 2019/06/17 08:07:18 UTC

UDTF函数

这是我写的UDTF函数:
class split2rows_PA01CH extends TableFunction[Row] {

def eval(ReportNo: String, field1: String, field2: String): Unit = {
val field1rows = field1.split(",", -1)
val field2rows = field2.split(",", -1)
for (x <- 0 until field1rows.length) {
val row = new Row(3)
      row.setField(0, ReportNo)
      row.setField(1, (x + 1).toString)
      row.setField(2, field1rows(x))
      row.setField(3, field2rows(x))
      collect(row)
    }
  }

override def getResultType: TypeInformation[Row] ={
    Types.ROW(Array("ReportNo", "ItemNo", "PA01CD01", "PA01CI01"),Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]])
  }

}






但是运行的时候竟然报错:
Exception in thread "main" java.lang.RuntimeException: The Nothing type cannot have a serializer.
	at org.apache.flink.api.common.typeinfo.NothingTypeInfo.createSerializer(NothingTypeInfo.java:74)
	at org.apache.flink.api.java.typeutils.RowTypeInfo.createSerializer(RowTypeInfo.java:240)
	at org.apache.flink.table.runtime.types.CRowTypeInfo.createSerializer(CRowTypeInfo.scala:57)
	at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:207)
	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:539)
	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166)
	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132)
	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1538)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:89)
	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
	at com.aibank.personbank.udtf.test$.main(test.scala:42)
	at com.aibank.personbank.udtf.test.main(test.scala)




我DEBUG发现,在执行createSerializer的时候,输出的ROW没有被构建RowTypeInfo(见附件),难道我写的UDTF函数有什么问题吗, getResultType不就是给Row构建RowTypeInfo的吗
跪求大佬解答,万分感谢!