You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by po...@gmx.com on 2022/07/11 08:15:46 UTC
Is it possible to save Table to CSV?
If I create dynamic table with:
CREATE TABLE some_table (name STRING, score INT)
WITH (
'format' = 'csv',
'...'
);
//do some other stuff here
Then how to save table result to CSV file?
Best,
Mike
Re: Re: Is it possible to save Table to CSV?
Posted by po...@gmx.com.
No. But I do not want any 'parquet'. I need CSV.
Which code should created file?
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE some_table (column_name1 STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
//This is to create file?:
// 'Crete table' means 'create file'? Car is a bicycle?
tEnv.executeSql("CREATE TABLE fs_table ("
\+ " column_name1 STRING, \n"
\+ " column_name2 DOUBLE \n"
\+ " ) WITH ( \n"
\+ " 'connector'='filesystem', \n"
\+ " 'path'='file:///C:/temp/test5.txt', \n"
\+ " 'format'='csv' \n"
\+ " )");
tEnv.executeSql("INSERT INTO fs_table SELECT column_name1, column_name2 from some_table");
It does not work anyway
**Sent:** Monday, July 11, 2022 at 5:23 PM
**From:** "Xuyang" <xy...@163.com>
**To:** podunk@gmx.com
**Cc:** user@flink.apache.org
**Subject:** Re:Re: Is it possible to save Table to CSV?
Hi, did you add the dependency of parquet[1]?
[1] <https://mvnrepository.com/artifact/org.apache.flink/flink-sql-parquet>
在 2022-07-11 18:33:38,podunk@gmx.com 写道:
> This example?:
>
>
>
>
> CREATE TABLE kafka_table (
> user_id STRING,
> order_amount DOUBLE,
> log_ts TIMESTAMP(3),
> WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
> ) WITH (...);
>
> CREATE TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE,
> dt STRING,
> `hour` STRING
> ) PARTITIONED BY (dt, `hour`) WITH (
> 'connector'='filesystem',
> 'path'='...',
> 'format'='parquet',
> 'sink.partition-commit.delay'='1 h',
> 'sink.partition-commit.policy.kind'='success-file'
> );
>
> -- streaming sql, insert into file system table
> INSERT INTO fs_table
> SELECT
> user_id,
> order_amount,
> DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
> DATE_FORMAT(log_ts, 'HH')
> FROM kafka_table;
>
> -- batch sql, select with partition pruning
> SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
>
>
>
> `CREATE TABLE kafka_table - creates table, not file`
>
> `CREATE TABLE fs_table - creates table, not file`
>
> `INSERT INTO fs_table - inserts into table from another table`
>
>
>
> `I do not see any create file section.`
>
>
>
> Anyway:
>
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any format factory for identifier 'parquet' in the classpath.
>
>
>
>
>
> **Sent:** Monday, July 11, 2022 at 10:31 AM
> **From:** "Lijie Wang" <wa...@gmail.com>
> **To:** podunk@gmx.com
> **Cc:** user@flink.apache.org
> **Subject:** Re: Is it possible to save Table to CSV?
>
> You can use the FileSink and set the format to csv. An example of FileSink:
> <https://nightlies.apache.org/flink/flink-docs-
> master/docs/connectors/table/filesystem/#full-example>
>
>
>
> Best,
>
> Lijie
>
>
>
> <[podunk@gmx.com](mailto:podunk@gmx.com)> 于2022年7月11日周一 16:16写道:
>
>>
>>
>> If I create dynamic table with:
>>
>>
>>
>>
>> CREATE TABLE some_table (name STRING, score INT)
>> WITH (
>> 'format' = 'csv',
>> '...'
>> );
>>
>>
>> //do some other stuff here
>>
>>
>>
>> Then how to save table result to CSV file?
>>
>>
>>
>> Best,
>>
>>
>>
>> Mike
>>
>>
>>
>>
Re:Re: Is it possible to save Table to CSV?
Posted by Xuyang <xy...@163.com>.
Hi, did you add the dependency of parquet[1]?
[1] https://mvnrepository.com/artifact/org.apache.flink/flink-sql-parquet
在 2022-07-11 18:33:38,podunk@gmx.com 写道:
This example?:
CREATETABLEkafka_table(user_idSTRING,order_amountDOUBLE,log_tsTIMESTAMP(3),WATERMARKFORlog_tsASlog_ts-INTERVAL'5'SECOND)WITH(...);CREATETABLEfs_table(user_idSTRING,order_amountDOUBLE,dtSTRING,`hour`STRING)PARTITIONEDBY(dt,`hour`)WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file');-- streaming sql, insert into file system table
INSERTINTOfs_tableSELECTuser_id,order_amount,DATE_FORMAT(log_ts,'yyyy-MM-dd'),DATE_FORMAT(log_ts,'HH')FROMkafka_table;-- batch sql, select with partition pruning
SELECT*FROMfs_tableWHEREdt='2020-05-20'and`hour`='12';
CREATETABLEkafka_table - creates table, not file
CREATETABLEfs_table - creates table, not file
INSERTINTOfs_table - inserts into table from another table
I do not see any create file section.
Anyway:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'parquet' in the classpath.
Sent: Monday, July 11, 2022 at 10:31 AM
From: "Lijie Wang" <wa...@gmail.com>
To: podunk@gmx.com
Cc: user@flink.apache.org
Subject: Re: Is it possible to save Table to CSV?
You can use the FileSink and set the format to csv. An example of FileSink: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
Best,
Lijie
<po...@gmx.com> 于2022年7月11日周一 16:16写道:
If I create dynamic table with:
CREATETABLEsome_table(nameSTRING,scoreINT)WITH('format'='csv','...');
//do some other stuff here
Then how to save table result to CSV file?
Best,
Mike
Re: Is it possible to save Table to CSV?
Posted by po...@gmx.com.
This example?:
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file'
);
-- streaming sql, insert into file system table
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
`CREATE TABLE kafka_table - creates table, not file`
`CREATE TABLE fs_table - creates table, not file`
`INSERT INTO fs_table - inserts into table from another table`
`I do not see any create file section.`
Anyway:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any
format factory for identifier 'parquet' in the classpath.
**Sent:** Monday, July 11, 2022 at 10:31 AM
**From:** "Lijie Wang" <wa...@gmail.com>
**To:** podunk@gmx.com
**Cc:** user@flink.apache.org
**Subject:** Re: Is it possible to save Table to CSV?
You can use the FileSink and set the format to csv. An example of FileSink:
<https://nightlies.apache.org/flink/flink-docs-
master/docs/connectors/table/filesystem/#full-example>
Best,
Lijie
<[podunk@gmx.com](mailto:podunk@gmx.com)> 于2022年7月11日周一 16:16写道:
>
>
> If I create dynamic table with:
>
>
>
>
> CREATE TABLE some_table (name STRING, score INT)
> WITH (
> 'format' = 'csv',
> '...'
> );
>
>
> //do some other stuff here
>
>
>
> Then how to save table result to CSV file?
>
>
>
> Best,
>
>
>
> Mike
>
>
>
>
Re: Is it possible to save Table to CSV?
Posted by Lijie Wang <wa...@gmail.com>.
You can use the FileSink and set the format to csv. An example of FileSink:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
Best,
Lijie
<po...@gmx.com> 于2022年7月11日周一 16:16写道:
>
> If I create dynamic table with:
>
>
> CREATE TABLE some_table (name STRING, score INT)WITH (
> 'format' = 'csv',
> '...');
>
> //do some other stuff here
>
> Then how to save table result to CSV file?
>
> Best,
>
> Mike
>
>
>