You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/10/17 07:12:00 UTC
[jira] [Updated] (FLINK-7852) An input of GenericTypeInfo
cannot be converted to Table
[ https://issues.apache.org/jira/browse/FLINK-7852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske updated FLINK-7852:
---------------------------------
Description:
Dear All:
I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP.
I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following:
=================The error msg=======================================
Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88)
at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85)
In addition, My code as below:
========================My Code==================================
{code}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().disableSysoutLogging();
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}";
private long count = 0L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning && count<2){
synchronized (ctx.getCheckpointLock()){
ctx.collect(str1);
count++;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
});
DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String s) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(s);
return node;
}
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
@Override
public Row map(JsonNode jsonNode) throws Exception {
int pos = 0;
Row row = new Row(jsonNode.size());
Iterator<String> iterator = jsonNode.fieldNames();
while (iterator.hasNext()){
String key = iterator.next();
row.setField(pos,jsonNode.get(key).asText());
pos++;
}
return row;
}
});
dataStreamRow.addSink(new SinkFunction<Row>() {
@Override
public void invoke(Row value) throws Exception {
System.out.println(value.getField(0));
}
});
Table myTable = tableEnvironment.fromDataStream(dataStreamRow);
Table result = myTable.select("f0");
DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);
dataStreamResult.print();
environment.execute();
}
{code}
Waiting for your earlier reply, thanks.
Best Wishes,
Han
was:
Dear All:
I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP.
I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following:
=================The error msg=======================================
Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88)
at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85)
In addition, My code as below:
========================My Code==================================
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().disableSysoutLogging();
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}";
private long count = 0L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning && count<2){
synchronized (ctx.getCheckpointLock()){
ctx.collect(str1);
count++;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
});
DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String s) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(s);
return node;
}
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
@Override
public Row map(JsonNode jsonNode) throws Exception {
int pos = 0;
Row row = new Row(jsonNode.size());
Iterator<String> iterator = jsonNode.fieldNames();
while (iterator.hasNext()){
String key = iterator.next();
row.setField(pos,jsonNode.get(key).asText());
pos++;
}
return row;
}
});
dataStreamRow.addSink(new SinkFunction<Row>() {
@Override
public void invoke(Row value) throws Exception {
System.out.println(value.getField(0));
}
});
Table myTable = tableEnvironment.fromDataStream(dataStreamRow);
Table result = myTable.select("f0");
DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);
dataStreamResult.print();
environment.execute();
}
Waiting for your earlier reply, thanks.
Best Wishes,
Han
> An input of GenericTypeInfo<Row> cannot be converted to Table
> -------------------------------------------------------------
>
> Key: FLINK-7852
> URL: https://issues.apache.org/jira/browse/FLINK-7852
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.2
> Reporter: hanningning
>
> Dear All:
> I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP.
> I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following:
> =================The error msg=======================================
> Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
> at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620)
> at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
> at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88)
> at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85)
> In addition, My code as below:
> ========================My Code==================================
> {code}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
> environment.getConfig().disableSysoutLogging();
> StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
> DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
> private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}";
> private long count = 0L;
> private volatile boolean isRunning = true;
> @Override
> public void run(SourceContext<String> ctx) throws Exception {
> while (isRunning && count<2){
> synchronized (ctx.getCheckpointLock()){
> ctx.collect(str1);
> count++;
> }
> }
> }
> @Override
> public void cancel() {
> isRunning = false;
> }
> });
> DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
> @Override
> public JsonNode map(String s) throws Exception {
> ObjectMapper objectMapper = new ObjectMapper();
> JsonNode node = objectMapper.readTree(s);
> return node;
> }
> });
> DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
> @Override
> public Row map(JsonNode jsonNode) throws Exception {
> int pos = 0;
> Row row = new Row(jsonNode.size());
> Iterator<String> iterator = jsonNode.fieldNames();
> while (iterator.hasNext()){
> String key = iterator.next();
> row.setField(pos,jsonNode.get(key).asText());
> pos++;
> }
> return row;
> }
> });
> dataStreamRow.addSink(new SinkFunction<Row>() {
> @Override
> public void invoke(Row value) throws Exception {
> System.out.println(value.getField(0));
> }
> });
> Table myTable = tableEnvironment.fromDataStream(dataStreamRow);
> Table result = myTable.select("f0");
> DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);
> dataStreamResult.print();
> environment.execute();
> }
> {code}
> Waiting for your earlier reply, thanks.
> Best Wishes,
> Han
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)