You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2020/02/12 16:15:05 UTC
Flink Table API
Hi to all,
I was trying to use the new Table API using the new Blink planner but I
figured out that they do not use exactly the same APIs..for example I can't
go back and forth from Tables to Dataset/Datastream anymore (using
tableEnv.toDataset for example).
Is this a temporary behaviour or this functionality will be removed sooner
or later (since from what I understood the current Flink planner will be
replaced by the blink one)?
I like the idea of creating sourcing just using properties but then I can't
transform it into a Dataset/Datastream.
Also the documentation seems to be quite messy about this transition
phase...
This is my test program:
public static void main(String[] args) throws Exception {
// set up the batch execution environment
EnvironmentSettings bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
String createTableSql = "CREATE TABLE civici (\n" //
+ "\t`X` DOUBLE,\n" //
+ "\t`Y` DOUBLE\n" //
+ ") WITH (\n"//
+ "\t'connector.type' = 'filesystem',\n"//
+ "\t'connector.path' = 'file:///tmp/test.csv',\n"//
+ "\t'format.type' = 'csv',\n" //
+ "\t'format.fields.0.name' = 'X',\n"//
+ "\t'format.fields.0.data-type' = 'DOUBLE',\n" //
+ "\t'format.fields.1.name' = 'Y',\n"//
+ "\t'format.fields.1.data-type' = 'DOUBLE',\n"//
+ "\t'format.field-delimiter' = ',',\n"//
// + "\t'format.line-delimiter' = '\n',\n"//
+ "\t'format.quote-character' = '\"',\n"//
// +"\t'format.allow-comments' = 'true',"//
// +"\t'format.ignore-parse-errors' = 'true',"//
+ "\t'format.ignore-first-line' = 'true'\n"//
// +"\t'format.array-element-delimiter' = '|', "//
// +"\t'format.escape-character' = '\\',"//
// +"\t'format.null-literal' = 'n/a'"//
+ ")";
System.out.println(createTableSql);
bbTableEnv.sqlUpdate(createTableSql);
Table table = bbTableEnv.sqlQuery("SELECT * FROM civici");
List<Row> rows = TableUtils.collectToList(table);
for (Row row : rows) {
System.out.println(row);
}
}
Best,
Flavio
Re: Flink Table API
Posted by Jark Wu <im...@gmail.com>.
Hi Flavio,
There is 2 main entry points in Table API, one is `TableEnvironment`,
another is `StreamTableEnvironment`.
- `TableEnvironment` is used for pure Table API & SQL programs.
- `StreamTableEnvironment` can be used to convert from/to DataStream.
These two interface will be kept in the future, but `BatchTableEnvironment`
will be removed in the future.
Currently, it's true that blink planner batch mode can't convert
into DataStream, because `StreamTableEnvironment#create`
only accepts streaming mode. But this is on the roadmap, once DataStream
support a set of batch operations, I think we can
make `StreamTableEnvironment` support batch mode.
If you want to conver into DataStream in blink planner batch mode, there is
a tricky workaround that create a
`StreamTableEnvironment` via the `StreamTableEnvironmentImpl` constructor.
SQL CLI does in this way, you can take as an example [1].
Best,
Jark
[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L413
On Thu, 13 Feb 2020 at 00:15, Flavio Pompermaier <po...@okkam.it>
wrote:
> Hi to all,
> I was trying to use the new Table API using the new Blink planner but I
> figured out that they do not use exactly the same APIs..for example I can't
> go back and forth from Tables to Dataset/Datastream anymore (using
> tableEnv.toDataset for example).
> Is this a temporary behaviour or this functionality will be removed sooner
> or later (since from what I understood the current Flink planner will be
> replaced by the blink one)?
> I like the idea of creating sourcing just using properties but then I
> can't transform it into a Dataset/Datastream.
> Also the documentation seems to be quite messy about this transition
> phase...
>
> This is my test program:
>
> public static void main(String[] args) throws Exception {
> // set up the batch execution environment
> EnvironmentSettings bbSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
>
>
> String createTableSql = "CREATE TABLE civici (\n" //
> + "\t`X` DOUBLE,\n" //
> + "\t`Y` DOUBLE\n" //
> + ") WITH (\n"//
> + "\t'connector.type' = 'filesystem',\n"//
> + "\t'connector.path' = 'file:///tmp/test.csv',\n"//
> + "\t'format.type' = 'csv',\n" //
> + "\t'format.fields.0.name' = 'X',\n"//
> + "\t'format.fields.0.data-type' = 'DOUBLE',\n" //
> + "\t'format.fields.1.name' = 'Y',\n"//
> + "\t'format.fields.1.data-type' = 'DOUBLE',\n"//
> + "\t'format.field-delimiter' = ',',\n"//
> // + "\t'format.line-delimiter' = '\n',\n"//
> + "\t'format.quote-character' = '\"',\n"//
> // +"\t'format.allow-comments' = 'true',"//
> // +"\t'format.ignore-parse-errors' = 'true',"//
> + "\t'format.ignore-first-line' = 'true'\n"//
> // +"\t'format.array-element-delimiter' = '|', "//
> // +"\t'format.escape-character' = '\\',"//
> // +"\t'format.null-literal' = 'n/a'"//
> + ")";
> System.out.println(createTableSql);
> bbTableEnv.sqlUpdate(createTableSql);
>
> Table table = bbTableEnv.sqlQuery("SELECT * FROM civici");
> List<Row> rows = TableUtils.collectToList(table);
>
> for (Row row : rows) {
> System.out.println(row);
> }
> }
>
>
> Best,
> Flavio
>