You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 韩宁宁 <45...@qq.com> on 2017/10/23 11:01:47 UTC
Incompatible types of expression and result type.
Dear All, I have a question about TableSource.
I defined a TableSource By StreamTableSource,then register a table and execute a query.the sql as "select f0 from myTable". final,turn the result table to DataStream.
The following error occurred in execution and how to solve?
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Incompatible types of expression and result type.
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:966)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:964)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
MyCode as follows:
======================================
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
tableEnvironment.registerTableSource("myTable",new MyTableSource());
String sql = "select f0 from myTable";
Table sqlResult = tableEnvironment.sql(sql);
DataStream<Tuple2<Boolean,String>> result = tableEnvironment.toRetractStream(sqlResult,String.class);
result.print();
environment.execute();
}package com.xiaoju.manhattan.fbi.data.calc.source;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
public class MyTableSource implements StreamTableSource<Row>{
@Override
public TypeInformation<Row> getReturnType() {
TypeInformation<Row> typeInformation = new RowTypeInfo(Types.STRING,Types.STRING,Types.STRING);//Types.STRING,Types.STRING,Types.STRING
return typeInformation;
}
@Override
public String explainSource() {
return "";
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() {
private long count = 0L;
private volatile boolean isRunning = true;
private String str = "{\"ak\":\"av\",\"bk\":\"bv\",\"ck\":\"cv\"}";
@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (isRunning && count < 10){
synchronized (ctx.getCheckpointLock()){
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(str);
Row row = new Row(jsonNode.size()-1);
for(int i=0;i<jsonNode.size();i++){
row.setField(i,jsonNode.get(i));
}
ctx.collect(row);
count++;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
});
return dataStream;
}
}
Re: Incompatible types of expression and result type.
Posted by Timo Walther <tw...@apache.org>.
Hi,
I could found the problem in your implementation. The Table API program
is correct. However, the DataStream program that you construct in your
TableSource has a wrong type. When ever you use a Row type, you need to
specify the type either by implementing ResultTypeQueryable or in your
can by supplying the info in the second parameter.
DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() {
@Override public void run(SourceContext<Row> ctx)throws Exception {
}
@Override public void cancel() {
}
}, Types.ROW(Types.STRING(),Types.STRING(),Types.STRING()));
Otherwise your SourceFunction will have a generic black box type that
can not be accessed by the Table API.
Regards,
Timo
Am 10/23/17 um 1:01 PM schrieb 韩宁宁:
> StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); tableEnvironment.registerTableSource("myTable",new MyTableSource()); String sql ="select f0 from myTable"; Table sqlResult = tableEnvironment.sql(sql); DataStream<Tuple2<Boolean,String>> result = tableEnvironment.toRetractStream(sqlResult,String.class); result.print(); environment.execute();