You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/10/17 07:12:00 UTC

[jira] [Updated] (FLINK-7852) An input of GenericTypeInfo cannot be converted to Table

     [ https://issues.apache.org/jira/browse/FLINK-7852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Fabian Hueske updated FLINK-7852:
---------------------------------
    Description: 
Dear All:
   I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP.
   I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following:
=================The error msg=======================================
Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
	at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620)
	at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
	at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88)
	at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85)

In addition, My code as below:
========================My Code==================================
{code}
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.getConfig().disableSysoutLogging();
    StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
    DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
        private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}";
        private long count = 0L;
        private volatile boolean isRunning = true;
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (isRunning && count<2){
                synchronized (ctx.getCheckpointLock()){
                    ctx.collect(str1);
                    count++;
                }
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    });
    DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
        @Override
        public JsonNode map(String s) throws Exception {
            ObjectMapper objectMapper = new ObjectMapper();
            JsonNode node = objectMapper.readTree(s);
            return node;
        }
    });
    DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
        @Override
        public Row map(JsonNode jsonNode) throws Exception {
            int pos = 0;
            Row row = new Row(jsonNode.size());
            Iterator<String> iterator = jsonNode.fieldNames();
            while (iterator.hasNext()){
                String key = iterator.next();
                row.setField(pos,jsonNode.get(key).asText());
                pos++;
            }

            return row;
        }


    });

    dataStreamRow.addSink(new SinkFunction<Row>() {
        @Override
        public void invoke(Row value) throws Exception {
            System.out.println(value.getField(0));
        }
    });

    Table myTable = tableEnvironment.fromDataStream(dataStreamRow);

    Table result = myTable.select("f0");

    DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);

    dataStreamResult.print();

    environment.execute();
}
{code}

Waiting for your earlier reply, thanks.
Best Wishes,
Han

  was:
Dear All:
   I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP.
   I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following:
=================The error msg=======================================
Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
	at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620)
	at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
	at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88)
	at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85)

In addition, My code as below:
========================My Code==================================
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.getConfig().disableSysoutLogging();
    StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
    DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
        private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}";
        private long count = 0L;
        private volatile boolean isRunning = true;
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (isRunning && count<2){
                synchronized (ctx.getCheckpointLock()){
                    ctx.collect(str1);
                    count++;
                }
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    });
    DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
        @Override
        public JsonNode map(String s) throws Exception {
            ObjectMapper objectMapper = new ObjectMapper();
            JsonNode node = objectMapper.readTree(s);
            return node;
        }
    });
    DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
        @Override
        public Row map(JsonNode jsonNode) throws Exception {
            int pos = 0;
            Row row = new Row(jsonNode.size());
            Iterator<String> iterator = jsonNode.fieldNames();
            while (iterator.hasNext()){
                String key = iterator.next();
                row.setField(pos,jsonNode.get(key).asText());
                pos++;
            }

            return row;
        }


    });

    dataStreamRow.addSink(new SinkFunction<Row>() {
        @Override
        public void invoke(Row value) throws Exception {
            System.out.println(value.getField(0));
        }
    });

    Table myTable = tableEnvironment.fromDataStream(dataStreamRow);

    Table result = myTable.select("f0");

    DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);

    dataStreamResult.print();

    environment.execute();
}
Waiting for your earlier reply, thanks.
Best Wishes,
Han


> An input of GenericTypeInfo<Row> cannot be converted to Table
> -------------------------------------------------------------
>
>                 Key: FLINK-7852
>                 URL: https://issues.apache.org/jira/browse/FLINK-7852
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.2
>            Reporter: hanningning
>
> Dear All:
>    I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP.
>    I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following:
> =================The error msg=======================================
> Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
> 	at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620)
> 	at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
> 	at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88)
> 	at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85)
> In addition, My code as below:
> ========================My Code==================================
> {code}
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
>     environment.getConfig().disableSysoutLogging();
>     StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
>     DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
>         private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}";
>         private long count = 0L;
>         private volatile boolean isRunning = true;
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>             while (isRunning && count<2){
>                 synchronized (ctx.getCheckpointLock()){
>                     ctx.collect(str1);
>                     count++;
>                 }
>             }
>         }
>         @Override
>         public void cancel() {
>             isRunning = false;
>         }
>     });
>     DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
>         @Override
>         public JsonNode map(String s) throws Exception {
>             ObjectMapper objectMapper = new ObjectMapper();
>             JsonNode node = objectMapper.readTree(s);
>             return node;
>         }
>     });
>     DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
>         @Override
>         public Row map(JsonNode jsonNode) throws Exception {
>             int pos = 0;
>             Row row = new Row(jsonNode.size());
>             Iterator<String> iterator = jsonNode.fieldNames();
>             while (iterator.hasNext()){
>                 String key = iterator.next();
>                 row.setField(pos,jsonNode.get(key).asText());
>                 pos++;
>             }
>             return row;
>         }
>     });
>     dataStreamRow.addSink(new SinkFunction<Row>() {
>         @Override
>         public void invoke(Row value) throws Exception {
>             System.out.println(value.getField(0));
>         }
>     });
>     Table myTable = tableEnvironment.fromDataStream(dataStreamRow);
>     Table result = myTable.select("f0");
>     DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);
>     dataStreamResult.print();
>     environment.execute();
> }
> {code}
> Waiting for your earlier reply, thanks.
> Best Wishes,
> Han



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)