You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 韩宁宁 <45...@qq.com> on 2017/10/23 11:01:47 UTC

Incompatible types of expression and result type.

Dear All,    I have a question about  TableSource.
    I defined a TableSource By StreamTableSource,then register a table and execute a query.the sql as "select f0 from myTable". final,turn the result table to DataStream.
The following error occurred in execution and how to solve?
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Incompatible types of expression and result type.
	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:966)
	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:964)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

    MyCode as follows:
    ======================================
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);

    tableEnvironment.registerTableSource("myTable",new MyTableSource());
    String sql = "select f0 from myTable";
    Table sqlResult = tableEnvironment.sql(sql);

    DataStream<Tuple2<Boolean,String>> result = tableEnvironment.toRetractStream(sqlResult,String.class);
    result.print();

    environment.execute();
}package com.xiaoju.manhattan.fbi.data.calc.source;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

public class MyTableSource implements StreamTableSource<Row>{
    @Override
    public TypeInformation<Row> getReturnType() {
        TypeInformation<Row> typeInformation = new RowTypeInfo(Types.STRING,Types.STRING,Types.STRING);//Types.STRING,Types.STRING,Types.STRING
        return typeInformation;
    }

    @Override
    public String explainSource() {
        return "";
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

        DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() {
            private long count = 0L;
            private volatile boolean isRunning = true;
            private String str = "{\"ak\":\"av\",\"bk\":\"bv\",\"ck\":\"cv\"}";
            @Override
            public void run(SourceContext<Row> ctx) throws Exception {
                while (isRunning && count < 10){
                    synchronized (ctx.getCheckpointLock()){
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode jsonNode = objectMapper.readTree(str);
                        Row row = new Row(jsonNode.size()-1);
                        for(int i=0;i<jsonNode.size();i++){
                            row.setField(i,jsonNode.get(i));
                        }
                        ctx.collect(row);
                        count++;
                    }
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        return dataStream;
    }
}

Re: Incompatible types of expression and result type.

Posted by Timo Walther <tw...@apache.org>.
Hi,

I could found the problem in your implementation. The Table API program 
is correct. However, the DataStream program that you construct in your 
TableSource has a wrong type. When ever you use a Row type, you need to 
specify the type either by implementing ResultTypeQueryable or in your 
can by supplying the info in the second parameter.


DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() {
     @Override public void run(SourceContext<Row> ctx)throws Exception {
     }

     @Override public void cancel() {
     }
}, Types.ROW(Types.STRING(),Types.STRING(),Types.STRING()));


Otherwise your SourceFunction will have a generic black box type that 
can not be accessed by the Table API.

Regards,
Timo



Am 10/23/17 um 1:01 PM schrieb 韩宁宁:
> StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); tableEnvironment.registerTableSource("myTable",new MyTableSource()); String sql ="select f0 from myTable"; Table sqlResult = tableEnvironment.sql(sql); DataStream<Tuple2<Boolean,String>> result = tableEnvironment.toRetractStream(sqlResult,String.class); result.print(); environment.execute();