You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/10/17 07:20:00 UTC

[jira] [Commented] (FLINK-7852) An input of GenericTypeInfo cannot be converted to Table

    [ https://issues.apache.org/jira/browse/FLINK-7852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207110#comment-16207110 ] 

Fabian Hueske commented on FLINK-7852:
--------------------------------------

Hi Han,

as the error message says, you need to specify a {{RowTypeInfo}} to define the schema of the {{Row}} objects created by the {{MapFunction}}.
You can do that as follows:
{code}
DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
        ...
    }).returns(new RowTypeInfo(Types.STRING, Types.STRING, ...)); // Add as many fields as your Row has
{code}

Alternatively, the {{MapFunction}} can also implement the {{ResultTypeQueryable}} interface.

Note that the schema of {{Row}} is fixed and not dynamic, i.e., all rows must have the same schema (number of fields and field types).

Please note as well, that JIRA is used for bug reports and not for user question. 
Please reach out to the [user mailing list|http://flink.apache.org/community.html#mailing-lists] for further questions.

> An input of GenericTypeInfo<Row> cannot be converted to Table
> -------------------------------------------------------------
>
>                 Key: FLINK-7852
>                 URL: https://issues.apache.org/jira/browse/FLINK-7852
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.2
>            Reporter: hanningning
>
> Dear All:
>    I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP.
>    I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following:
> =================The error msg=======================================
> Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
> 	at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620)
> 	at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
> 	at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88)
> 	at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85)
> In addition, My code as below:
> ========================My Code==================================
> {code}
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
>     environment.getConfig().disableSysoutLogging();
>     StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
>     DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
>         private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}";
>         private long count = 0L;
>         private volatile boolean isRunning = true;
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             while (isRunning && count<2){
>                 synchronized (ctx.getCheckpointLock()){
>                     ctx.collect(str1);
>                     count++;
>                 }
>             }
>         }
>         @Override
>         public void cancel() {
>             isRunning = false;
>         }
>     });
>     DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
>         @Override
>         public JsonNode map(String s) throws Exception {
>             ObjectMapper objectMapper = new ObjectMapper();
>             JsonNode node = objectMapper.readTree(s);
>             return node;
>         }
>     });
>     DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
>         @Override
>         public Row map(JsonNode jsonNode) throws Exception {
>             int pos = 0;
>             Row row = new Row(jsonNode.size());
>             Iterator<String> iterator = jsonNode.fieldNames();
>             while (iterator.hasNext()){
>                 String key = iterator.next();
>                 row.setField(pos,jsonNode.get(key).asText());
>                 pos++;
>             }
>             return row;
>         }
>     });
>     dataStreamRow.addSink(new SinkFunction<Row>() {
>         @Override
>         public void invoke(Row value) throws Exception {
>             System.out.println(value.getField(0));
>         }
>     });
>     Table myTable = tableEnvironment.fromDataStream(dataStreamRow);
>     Table result = myTable.select("f0");
>     DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);
>     dataStreamResult.print();
>     environment.execute();
> }
> {code}
> Waiting for your earlier reply, thanks.
> Best Wishes,
> Han



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)