You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 韩宁宁 <45...@qq.com> on 2017/10/23 03:16:55 UTC

The TypeInformation about Table API&SQL

Dear All,
     My name is Han. I'm very interested in your advanced Flink system, and I'm learning it.
     I'm writing to your group for communicating about my personal question. I tried to use Table API&SQL and register a TableSource by the KafkaJsonTableSource method, I have to say it works very well. My code as follows.
And my question is about TypeInformation. The first parameter returns an array of String types and it contains fields in the kafka, the second parameter corresponds to the type of each field,i.e the below code,new String[]{"a","b","c"} is the the first parameter,new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()} is the second parameter.
If I didn't know the name of the field in the kafka before registering TableSource, in other words,
the fields in the kafka are dynamic, how to solve this problem?

TypeInformation<Row> typeInfo = Types.ROW(
        new String[]{"a","b","c"},
        new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()}
);
KafkaJsonTableSource kafkaTableSource = new Kafka010JsonTableSource(eltResultTopic,
        inputProperties, typeInfo);
kafkaTableSource.setFailOnMissingField(false);

tableEnvironment.registerTableSource("kafkaSource",kafkaTableSource);
Table sqlResult = tableEnvironment.sql("select a from kafkaSource");
DataStream<String> dataStream = tableEnvironment.toAppendStream(sqlResult,String.class);
dataStream.print();

environment.execute();
Waiting for your earlier reply. Thanks.
Best Wishes,
Han

Re: The TypeInformation about Table API&SQL

Posted by Timo Walther <tw...@apache.org>.
Hi Han,

generally, Flink is a strongly typed system. I think the easiest way to 
handle a dynamic schema is to read your JSON as a String. You can then 
implement your own ScalarFunction (or in this case also a TableFunction) 
[1] and use any JSON parsing library in this function for preprocessing 
to a common representation.

I hope this helps.

Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/udfs.html


Am 10/23/17 um 5:16 AM schrieb 韩宁宁:
> Dear All,
>      My name is Han. I'm very interested in your advanced Flink 
> system, and I'm learning it.
>      I'm writing to your group for communicating about my personal 
> question. I tried to use Table API&SQL and register a TableSource by 
> the KafkaJsonTableSource method, I have to say it works very well. My 
> code as follows.
> And my question is about TypeInformation. The first parameter returns 
> an array of String types and it contains fields in the kafka, the 
> second parameter corresponds to the type of each field,i.e the below 
> code,new String[]{"a","b","c"} is the the first parameter,new 
> TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()} is 
> the second parameter.
> If Ididn't know the name of the field in the kafka before registering 
> TableSource, in other words,
> the fields in the kafka are dynamic, how to solve this problem?
> TypeInformation<Row> typeInfo = Types.ROW(
>          new String[]{"a","b","c"}, new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()}
> ); KafkaJsonTableSource kafkaTableSource =new Kafka010JsonTableSource(eltResultTopic, inputProperties, typeInfo); kafkaTableSource.setFailOnMissingField(false); tableEnvironment.registerTableSource("kafkaSource",kafkaTableSource); Table sqlResult = tableEnvironment.sql("select a from kafkaSource"); DataStream<String> dataStream = tableEnvironment.toAppendStream(sqlResult,String.class); dataStream.print(); environment.execute();
> Waiting for your earlier reply. Thanks.
> Best Wishes,
> Han
>
>
>
>
>
>